[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread xiongqi wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503847#comment-17503847
 ] 

xiongqi wu commented on KAFKA-13723:


[~junrao]  Hi Jun, this function is supposed to capture violation that pass-by 
the max compaction delay. 

e.g, 

if maxCompactionDelay > 0, which mean it has violated the policy (e.g, the log 
is not compacted within the maxCompaction config time), and the log should be 
compact immediately. 

if maxCompactionDelay = 0, not violation found, and the log doesn't need to be 
compacted immediately. 

 

The name is a little bit misleading.  maxCompactionDelay doesn't mean log 
cleaner  should delay util compaction. Instead, it means the delay already 
happened, and it should be cleaned immediately. 

 

 

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-10806) throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions

2020-12-03 Thread xiongqi wu (Jira)
xiongqi wu created KAFKA-10806:
--

 Summary: throwable from user callback on 
completeFutureAndFireCallbacks can lead to unhandled exceptions
 Key: KAFKA-10806
 URL: https://issues.apache.org/jira/browse/KAFKA-10806
 Project: Kafka
  Issue Type: Bug
Reporter: xiongqi wu


 When kafka producer tries to complete/abort a batch,  producer invokes user 
callback. However, "completeFutureAndFireCallbacks" only captures exceptions 
from user callback not all throwables.  An uncaught throwable can prevent the 
batch from being freed.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8527) add dynamic maintenance broker config

2019-06-11 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-8527:
-

 Summary: add dynamic maintenance broker config
 Key: KAFKA-8527
 URL: https://issues.apache.org/jira/browse/KAFKA-8527
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


Before we remove a broker for maintenance, we want to remove all partitions out 
of the broker first to avoid introducing new Under Replicated Partitions (URPs) 
. That is because shutting down (or killing) a broker that still hosts live 
partitions will lead to temporarily reduced replicas of those partitions. 
Moving partitions out of a broker can be done via partition reassignment.  
However, during the partition reassignment process, new topics can be created 
by Kafka and thereby new partitions can be added to the broker that is pending 
for removal. As a result, the removal process will need to recursively moving 
new topic partitions out of the maintenance broker. In a production environment 
in which topic creation is frequent and URP causing by broker removal cannot be 
tolerated, the removal process can take multiple iterations to complete the 
partition reassignment.  We want to provide a mechanism to mask a broker as 
maintenance broker (Via Cluster Level Dynamic configuration). One action Kafka 
can take for the maintenance broker is not to assign new topic partitions to 
it, and thereby facilitate the broker removal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8249) partition reassignment may never finish if topic deletion completes first

2019-04-17 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-8249:
-

 Summary: partition reassignment may never finish if topic deletion 
completes first 
 Key: KAFKA-8249
 URL: https://issues.apache.org/jira/browse/KAFKA-8249
 Project: Kafka
  Issue Type: Bug
Reporter: xiongqi wu
Assignee: xiongqi wu


kafka allows topic deletion to complete successfully when there are pending 
partition reassignments of the same topics. (if topic deletion request comes 
after partition reassignment). 

This leads several issues: 1) pending partition reassignments of deleted topic 
never complete because the topic is deleted. 2) onPartitionReassignment -> 
updateAssignedReplicasForPartition will throw out IllegalStateException for 
non-existing node. This in turns causes controller not to resume topic deletion 
for online broker and also fail to register broker notification handler (etc.) 
during onBrokerStartup. 

To fix, we need to clean up pending partition reassignment during topic 
deletion.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-04-01 Thread xiongqi wu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16807191#comment-16807191
 ] 

xiongqi wu commented on KAFKA-7362:
---

[~dhruvilshah]  Yes, I agree these two (delete topic and cleanup orphan 
partition) are two different problems and  need to be addressed separately.  
But When we design the mechanism to cleanup orphan partitions, we want to keep 
the use case of topic deletion in mind. 

Regarding the KIP,  I am totally fine with fixing it without KIP as long as 
committers are ok with it. 

I had the draft implementation before: 

[https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9]

This implement solution doesn't cleanup the orphan partitions immediately but 
still  wait for the default retention time to pass before remove the orphan 
partitions.  It's also one time action after broker boot-up. 

Alternatively, we could trigger the orphan partition removal after every 
zookeeper time-time/new session.

 

 

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-28 Thread xiongqi wu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16804577#comment-16804577
 ] 

xiongqi wu commented on KAFKA-7362:
---

Yes, I am stilling looking forward on fixing this issue.

 I was on hold on this item because

1)  there is no active discussion on KIP-370. 

2)  I am also thinking about combing this with topic deletion. Today a topic 
can only be deleted when all related brokers are online.  This causes topic 
deletion to stuck if a broker fails and never come back again.  I was thinking 
combining this feature with the change of topic deletion state machine so that 
topic deletion can always success and orphan partitions can be safely removed 
when offline brokers come back online.

I will update the ticket and KIP-370 after I think a little more about item 2. 
 

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.

2018-11-16 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7650:
-

 Summary: make "auto.create.topics.enable"  dynamically 
configurable. 
 Key: KAFKA-7650
 URL: https://issues.apache.org/jira/browse/KAFKA-7650
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


There are several use cases that we want to make "auto.create.topics.enable" 
can be dynamically configured. 

For example:
1) wild card consumer can recreate deleted topics 
2) We also see misconfigured consumer that consumes from wrong clusters ends up 
with creating a lot of zombie topics in target cluster. 

In such cases, we may want to temporarily disable  "auto.create.topics.enable", 
and re-enable topic creation later after problem is solved without restarting 
brokers. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-12 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7501:
-

 Summary: double deallocation of producer batch upon expiration of 
inflight requests and error response
 Key: KAFKA-7501
 URL: https://issues.apache.org/jira/browse/KAFKA-7501
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: xiongqi wu
Assignee: xiongqi wu


The following event sequence will lead to double deallocation of a producer 
batch.

1) a producer batch is sent and the response is not received. 

2) the inflight producer batch is expired when deliveryTimeoutMs has reached.  
The  sender fail the producer batch via "failBatch" and the producer batch is 
deallocated via "accumulator.deallocate(batch)". 

3) the response for the batch finally arrived after batch expiration, and the 
response contains the error "Errors.MESSAGE_TOO_LARGE" .

4) the producer batch is split and the original batch is deallocated a second 
time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2018-08-30 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7362:
-

 Summary: enable kafka broker to remove orphan partitions 
automatically 
 Key: KAFKA-7362
 URL: https://issues.apache.org/jira/browse/KAFKA-7362
 Project: Kafka
  Issue Type: Improvement
  Components: core, log
Reporter: xiongqi wu
Assignee: xiongqi wu


When partition reassignment removes topic partitions from a offline broker, 
those removed partitions become orphan partitions to the broker. When the 
offline broker comes back online, it is not able to clean up both data and 
folders that belong to orphan partitions.  Log manager will scan all all dirs 
during startup, but the time based retention policy on a topic partition will 
not be kicked out until the broker is either a follower or a leader of the 
partition.  In addition, we do not have logic to delete folders that belong to 
orphan partition today. 

Open this ticket to provide a mechanism (when enabled) to safely remove orphan 
partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-22 Thread xiongqi wu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589199#comment-16589199
 ] 

xiongqi wu commented on KAFKA-7322:
---

[~dhruvilshah]  I believe this race condition is different from the race in 
KAFKA-7278  .  This race is caused by compaction reading from the logsement 
while retention thread delete the log segment. 

Ideally, we should not let compaction and retention threads work on the same 
partition. 

> race between compaction thread and retention thread when changing topic 
> cleanup policy
> --
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7322:
-

 Summary: race between compaction thread and retention thread when 
changing topic cleanup policy
 Key: KAFKA-7322
 URL: https://issues.apache.org/jira/browse/KAFKA-7322
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: xiongqi wu
Assignee: xiongqi wu


The deletion thread will grab the log.lock when it tries to rename log segment 
and schedule for actual deletion.

The compaction thread only grabs the log.lock when it tries to replace the 
original segments with the cleaned segment. The compaction thread doesn't grab 
the log when it reads records from the original segments to build offsetmap and 
new segments. As a result, if both deletion and compaction threads work on the 
same log partition. We have a race condition. 

This race happens when the topic cleanup policy is updated on the fly.  

One case to hit this race condition:

1: topic clean up policy is "compact" initially 

2: log cleaner (compaction) thread picks up the partition for compaction and 
still in progress

3: the topic clean up policy has been updated to "deletion"

4: retention thread pick up the topic partition and delete some old segments.

5: log cleaner thread reads from the deleted log and raise an IO exception. 

 

The proposed solution is to use "inprogress" map that cleaner manager has to 
protect such a race.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-08-21 Thread xiongqi wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiongqi wu reassigned KAFKA-7321:
-

Assignee: xiongqi wu

> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7321:
-

 Summary: ensure timely processing of deletion requests in Kafka 
topic (Time-based log compaction)
 Key: KAFKA-7321
 URL: https://issues.apache.org/jira/browse/KAFKA-7321
 Project: Kafka
  Issue Type: Improvement
  Components: log
Reporter: xiongqi wu


_Compaction enables Kafka to remove old messages that are flagged for deletion 
while other messages can be retained for a relatively longer time.  Today, a 
log segment may remain un-compacted for a long time since the eligibility for 
log compaction is determined based on compaction ratio 
(“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") 
setting.  Ability to delete a log message through compaction in a timely manner 
has become an important requirement in some use cases (e.g., GDPR).  For 
example,  one use case is to delete PII (Personal Identifiable information) 
data within 7 days while keeping non-PII indefinitely in compacted format.  The 
goal of this change is to provide a time-based compaction policy that ensures 
the cleanable section is compacted after the specified time interval regardless 
of dirty ratio and “min compaction lag”.  However, dirty ratio and “min 
compaction lag” are still honored if the time based compaction rule is not 
violated. In other words, if Kafka receives a deletion request on a key (e..g, 
a key with null value), the corresponding log segment will be picked up for 
compaction after the configured time interval to remove the key._

 

_This is to track effort in KIP 354:_

_https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)