[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503847#comment-17503847 ] xiongqi wu commented on KAFKA-13723: [~junrao] Hi Jun, this function is supposed to capture violation that pass-by the max compaction delay. e.g, if maxCompactionDelay > 0, which mean it has violated the policy (e.g, the log is not compacted within the maxCompaction config time), and the log should be compact immediately. if maxCompactionDelay = 0, not violation found, and the log doesn't need to be compacted immediately. The name is a little bit misleading. maxCompactionDelay doesn't mean log cleaner should delay util compaction. Instead, it means the delay already happened, and it should be cleaned immediately. > max.compaction.lag.ms implemented incorrectly > - > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Jun Rao >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-10806) throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions
xiongqi wu created KAFKA-10806: -- Summary: throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions Key: KAFKA-10806 URL: https://issues.apache.org/jira/browse/KAFKA-10806 Project: Kafka Issue Type: Bug Reporter: xiongqi wu When kafka producer tries to complete/abort a batch, producer invokes user callback. However, "completeFutureAndFireCallbacks" only captures exceptions from user callback not all throwables. An uncaught throwable can prevent the batch from being freed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8527) add dynamic maintenance broker config
xiongqi wu created KAFKA-8527: - Summary: add dynamic maintenance broker config Key: KAFKA-8527 URL: https://issues.apache.org/jira/browse/KAFKA-8527 Project: Kafka Issue Type: Improvement Reporter: xiongqi wu Assignee: xiongqi wu Before we remove a broker for maintenance, we want to remove all partitions out of the broker first to avoid introducing new Under Replicated Partitions (URPs) . That is because shutting down (or killing) a broker that still hosts live partitions will lead to temporarily reduced replicas of those partitions. Moving partitions out of a broker can be done via partition reassignment. However, during the partition reassignment process, new topics can be created by Kafka and thereby new partitions can be added to the broker that is pending for removal. As a result, the removal process will need to recursively moving new topic partitions out of the maintenance broker. In a production environment in which topic creation is frequent and URP causing by broker removal cannot be tolerated, the removal process can take multiple iterations to complete the partition reassignment. We want to provide a mechanism to mask a broker as maintenance broker (Via Cluster Level Dynamic configuration). One action Kafka can take for the maintenance broker is not to assign new topic partitions to it, and thereby facilitate the broker removal. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8249) partition reassignment may never finish if topic deletion completes first
xiongqi wu created KAFKA-8249: - Summary: partition reassignment may never finish if topic deletion completes first Key: KAFKA-8249 URL: https://issues.apache.org/jira/browse/KAFKA-8249 Project: Kafka Issue Type: Bug Reporter: xiongqi wu Assignee: xiongqi wu kafka allows topic deletion to complete successfully when there are pending partition reassignments of the same topics. (if topic deletion request comes after partition reassignment). This leads several issues: 1) pending partition reassignments of deleted topic never complete because the topic is deleted. 2) onPartitionReassignment -> updateAssignedReplicasForPartition will throw out IllegalStateException for non-existing node. This in turns causes controller not to resume topic deletion for online broker and also fail to register broker notification handler (etc.) during onBrokerStartup. To fix, we need to clean up pending partition reassignment during topic deletion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16807191#comment-16807191 ] xiongqi wu commented on KAFKA-7362: --- [~dhruvilshah] Yes, I agree these two (delete topic and cleanup orphan partition) are two different problems and need to be addressed separately. But When we design the mechanism to cleanup orphan partitions, we want to keep the use case of topic deletion in mind. Regarding the KIP, I am totally fine with fixing it without KIP as long as committers are ok with it. I had the draft implementation before: [https://github.com/xiowu0/kafka/commit/f1bd3085639f41a7af02567550a8e3018cfac3e9] This implement solution doesn't cleanup the orphan partitions immediately but still wait for the default retention time to pass before remove the orphan partitions. It's also one time action after broker boot-up. Alternatively, we could trigger the orphan partition removal after every zookeeper time-time/new session. > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16804577#comment-16804577 ] xiongqi wu commented on KAFKA-7362: --- Yes, I am stilling looking forward on fixing this issue. I was on hold on this item because 1) there is no active discussion on KIP-370. 2) I am also thinking about combing this with topic deletion. Today a topic can only be deleted when all related brokers are online. This causes topic deletion to stuck if a broker fails and never come back again. I was thinking combining this feature with the change of topic deletion state machine so that topic deletion can always success and orphan partitions can be safely removed when offline brokers come back online. I will update the ticket and KIP-370 after I think a little more about item 2. > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.
xiongqi wu created KAFKA-7650: - Summary: make "auto.create.topics.enable" dynamically configurable. Key: KAFKA-7650 URL: https://issues.apache.org/jira/browse/KAFKA-7650 Project: Kafka Issue Type: Improvement Reporter: xiongqi wu Assignee: xiongqi wu There are several use cases that we want to make "auto.create.topics.enable" can be dynamically configured. For example: 1) wild card consumer can recreate deleted topics 2) We also see misconfigured consumer that consumes from wrong clusters ends up with creating a lot of zombie topics in target cluster. In such cases, we may want to temporarily disable "auto.create.topics.enable", and re-enable topic creation later after problem is solved without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response
xiongqi wu created KAFKA-7501: - Summary: double deallocation of producer batch upon expiration of inflight requests and error response Key: KAFKA-7501 URL: https://issues.apache.org/jira/browse/KAFKA-7501 Project: Kafka Issue Type: Bug Components: clients Reporter: xiongqi wu Assignee: xiongqi wu The following event sequence will lead to double deallocation of a producer batch. 1) a producer batch is sent and the response is not received. 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. The sender fail the producer batch via "failBatch" and the producer batch is deallocated via "accumulator.deallocate(batch)". 3) the response for the batch finally arrived after batch expiration, and the response contains the error "Errors.MESSAGE_TOO_LARGE" . 4) the producer batch is split and the original batch is deallocated a second time. As a result, the "IllegalStateException" will be raised. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
xiongqi wu created KAFKA-7362: - Summary: enable kafka broker to remove orphan partitions automatically Key: KAFKA-7362 URL: https://issues.apache.org/jira/browse/KAFKA-7362 Project: Kafka Issue Type: Improvement Components: core, log Reporter: xiongqi wu Assignee: xiongqi wu When partition reassignment removes topic partitions from a offline broker, those removed partitions become orphan partitions to the broker. When the offline broker comes back online, it is not able to clean up both data and folders that belong to orphan partitions. Log manager will scan all all dirs during startup, but the time based retention policy on a topic partition will not be kicked out until the broker is either a follower or a leader of the partition. In addition, we do not have logic to delete folders that belong to orphan partition today. Open this ticket to provide a mechanism (when enabled) to safely remove orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589199#comment-16589199 ] xiongqi wu commented on KAFKA-7322: --- [~dhruvilshah] I believe this race condition is different from the race in KAFKA-7278 . This race is caused by compaction reading from the logsement while retention thread delete the log segment. Ideally, we should not let compaction and retention threads work on the same partition. > race between compaction thread and retention thread when changing topic > cleanup policy > -- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy
xiongqi wu created KAFKA-7322: - Summary: race between compaction thread and retention thread when changing topic cleanup policy Key: KAFKA-7322 URL: https://issues.apache.org/jira/browse/KAFKA-7322 Project: Kafka Issue Type: Bug Components: log Reporter: xiongqi wu Assignee: xiongqi wu The deletion thread will grab the log.lock when it tries to rename log segment and schedule for actual deletion. The compaction thread only grabs the log.lock when it tries to replace the original segments with the cleaned segment. The compaction thread doesn't grab the log when it reads records from the original segments to build offsetmap and new segments. As a result, if both deletion and compaction threads work on the same log partition. We have a race condition. This race happens when the topic cleanup policy is updated on the fly. One case to hit this race condition: 1: topic clean up policy is "compact" initially 2: log cleaner (compaction) thread picks up the partition for compaction and still in progress 3: the topic clean up policy has been updated to "deletion" 4: retention thread pick up the topic partition and delete some old segments. 5: log cleaner thread reads from the deleted log and raise an IO exception. The proposed solution is to use "inprogress" map that cleaner manager has to protect such a race. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
[ https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiongqi wu reassigned KAFKA-7321: - Assignee: xiongqi wu > ensure timely processing of deletion requests in Kafka topic (Time-based log > compaction) > > > Key: KAFKA-7321 > URL: https://issues.apache.org/jira/browse/KAFKA-7321 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > _Compaction enables Kafka to remove old messages that are flagged for > deletion while other messages can be retained for a relatively longer time. > Today, a log segment may remain un-compacted for a long time since the > eligibility for log compaction is determined based on compaction ratio > (“min.cleanable.dirty.ratio”) and min compaction lag > ("min.compaction.lag.ms") setting. Ability to delete a log message through > compaction in a timely manner has become an important requirement in some use > cases (e.g., GDPR). For example, one use case is to delete PII (Personal > Identifiable information) data within 7 days while keeping non-PII > indefinitely in compacted format. The goal of this change is to provide a > time-based compaction policy that ensures the cleanable section is compacted > after the specified time interval regardless of dirty ratio and “min > compaction lag”. However, dirty ratio and “min compaction lag” are still > honored if the time based compaction rule is not violated. In other words, if > Kafka receives a deletion request on a key (e..g, a key with null value), the > corresponding log segment will be picked up for compaction after the > configured time interval to remove the key._ > > _This is to track effort in KIP 354:_ > _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)
xiongqi wu created KAFKA-7321: - Summary: ensure timely processing of deletion requests in Kafka topic (Time-based log compaction) Key: KAFKA-7321 URL: https://issues.apache.org/jira/browse/KAFKA-7321 Project: Kafka Issue Type: Improvement Components: log Reporter: xiongqi wu _Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time. Today, a log segment may remain un-compacted for a long time since the eligibility for log compaction is determined based on compaction ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting. Ability to delete a log message through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR). For example, one use case is to delete PII (Personal Identifiable information) data within 7 days while keeping non-PII indefinitely in compacted format. The goal of this change is to provide a time-based compaction policy that ensures the cleanable section is compacted after the specified time interval regardless of dirty ratio and “min compaction lag”. However, dirty ratio and “min compaction lag” are still honored if the time based compaction rule is not violated. In other words, if Kafka receives a deletion request on a key (e..g, a key with null value), the corresponding log segment will be picked up for compaction after the configured time interval to remove the key._ _This is to track effort in KIP 354:_ _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)