[GitHub] [kafka] yashmayya opened a new pull request, #13800: KAFKA-15012: Allow leading zeros in numeric fields while deserializing JSON messages using the JsonConverter
yashmayya opened a new pull request, #13800: URL: https://github.com/apache/kafka/pull/13800 - https://issues.apache.org/jira/browse/KAFKA-15012 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field
[ https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-15012: -- Assignee: Yash Mayya > JsonConverter fails when there are leading Zeros in a field > --- > > Key: KAFKA-15012 > URL: https://issues.apache.org/jira/browse/KAFKA-15012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.4.0, 3.3.2 >Reporter: Ranjan Rao >Assignee: Yash Mayya >Priority: Major > Attachments: > enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch > > > When there are leading zeros in a field in the Kakfa Record, a sink connector > using JsonConverter fails with the below exception > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] > to Kafka Connect data failed due to serialization error: > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading > zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric > value: Leading zeroes not allowed > at [Source: (byte[])"00080153032837"; line: 1, column: 2] > at > com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) > at > com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) > at > com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754) > at > com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247) > at > com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734) > at > org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322) > at > org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531) > at > o
[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load
[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728551#comment-17728551 ] Haruki Okada commented on KAFKA-15046: -- [~showuon] Maybe I linked wrong file. What I thought is to make any LeaderEpochFileCache methods (which needs flush()) to be called outside of Log's global lock. LeaderEpochFileCache already does exclusive control by its RW lock so I think we don't need to call it inside the Log's global lock. [https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L44] > Produce performance issue under high disk load > -- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: Haruki Okada >Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 > runnable [0x7ef9a12e2000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666) > at kafka.server.KafkaApis.handle(KafkaApis.scala:175) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} > * > ** Also there were bunch of logs that writing producer snapshots took > hundreds of milliseconds. > *** > {code:java} > ... > [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote > producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. > (kafka.log.ProducerStateManager) > [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] W
[jira] [Comment Edited] (KAFKA-15046) Produce performance issue under high disk load
[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515 ] Haruki Okada edited comment on KAFKA-15046 at 6/2/23 4:01 AM: -- Yeah, io_uring is promising. However it only works with newer kernel (which some on-premises Kafka users may not be easy to update) and require rewriting a lot of parts of the code base. -For leader-epoch cache, the checkpointing is already done in scheduler thread so we should adopt solution2 I think- For leader epoch cache, some paths already doing checkpointing asynchronously (e.g. UnifiedLog.deleteOldSegments => UnifiedLog.maybeIncrementLogStartOffset => LeaderEpochFileCache.truncateFromStart on kafka scheduler), so we have to make fsync called outside of the lock (i.e. solution-2) anyways I think. Writing to CheckpointFile is already synchronized, so can't we just move checkpointing to outside of the lock? [https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76] was (Author: ocadaruma): Yeah, io_uring is promising. However it only works with newer kernel (which some on-premises Kafka users may not be easy to update) and require rewriting a lot of parts of the code base. For leader-epoch cache, the checkpointing is already done in scheduler thread so we should adopt solution2 I think. Writing to CheckpointFile is already synchronized, so can't we just move checkpointing to outside of the lock? [https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76] > Produce performance issue under high disk load > -- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: Haruki Okada >Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 > runnable [0x7ef9a12e2000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptim
[GitHub] [kafka] dengziming closed pull request #13761: Reproduce KAFKA-14996, Test restart controller
dengziming closed pull request #13761: Reproduce KAFKA-14996, Test restart controller URL: https://github.com/apache/kafka/pull/13761 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming closed pull request #13777: KAFKA-15036: UnknownServerError on any leader failover
dengziming closed pull request #13777: KAFKA-15036: UnknownServerError on any leader failover URL: https://github.com/apache/kafka/pull/13777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #13777: KAFKA-15036: UnknownServerError on any leader failover
dengziming commented on PR #13777: URL: https://github.com/apache/kafka/pull/13777#issuecomment-1573073315 this will be fixed in #13799 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors
dengziming commented on code in PR #13799: URL: https://github.com/apache/kafka/pull/13799#discussion_r1213858197 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -425,25 +430,24 @@ public void accept(ConfigResource configResource) { public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler"; -private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX = -"The active controller appears to be node "; - -private NotControllerException newNotControllerException() { -OptionalInt latestController = raftClient.leaderAndEpoch().leaderId(); -if (latestController.isPresent()) { -return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + -latestController.getAsInt() + "."); -} else { -return new NotControllerException("No controller appears to be active."); -} +private OptionalInt latestController() { +return raftClient.leaderAndEpoch().leaderId(); } -private NotControllerException newPreMigrationException() { -OptionalInt latestController = raftClient.leaderAndEpoch().leaderId(); -if (latestController.isPresent()) { -return new NotControllerException("The controller is in pre-migration mode."); +/** + * @return The offset that we should perform read operations at. + */ +private long currentReadOffset() { +if (isActiveController()) { +// The active controller keeps an in-memory snapshot at the last committed offset, +// which we want to read from when performing read operations. This will avoid +// reading uncommitted data. +return lastCommittedOffset; } else { -return new NotControllerException("No controller appears to be active."); +// Standby controllers never have uncommitted data in memory. Therefore, we return +// Long.MAX_VALUE, a special value which means "always read the latest from every +// data structure." +return Long.MAX_VALUE; Review Comment: How about using `SnapshotRegistry.LATEST_EPOCH` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load
[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728547#comment-17728547 ] Luke Chen commented on KAFKA-15046: --- I agree that io_uring is a good way to fix that, but it needs more discussion. And maybe we still need a fallback solution for old OS. > Writing to CheckpointFile is already synchronized, so can't we just move > checkpointing to outside of the lock? [~ocadaruma] , I don't get it. If we don't lock the write, there might be other threads read partial of data, or have concurrent write. Why do you think we don't need the lock? > Produce performance issue under high disk load > -- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: Haruki Okada >Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 > runnable [0x7ef9a12e2000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666) > at kafka.server.KafkaApis.handle(KafkaApis.scala:175) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} > * > ** Also there were bunch of logs that writing producer snapshots took > hundreds of milliseconds. > *** > {code:java} > ... > [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote > producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. > (kafka.log.ProducerStateManager) > [2023-05-01 11:08:37,319] INFO [ProducerStateManager parti
[GitHub] [kafka] artemlivshits commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)
artemlivshits commented on code in PR #13787: URL: https://github.com/apache/kafka/pull/13787#discussion_r1213645281 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,9 +579,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + /** + * Maybe create and return the verification state for the given producer ID if the transaction is not ongoing. Otherwise return null. + */ + def transactionNeedsVerifying(producerId: Long): Object = lock synchronized { Review Comment: The name implies that it's a function with no side effects, but it actually starts verification by installing some state. Maybe we should use "maybeStartTransactionVerification" or something like that. ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -184,6 +186,27 @@ private void clearProducerIds() { producerIdCount = 0; } +/** + * Maybe create the VerificationStateEntry for a given producer ID. Return it if it exists, otherwise return null. + */ +public VerificationStateEntry verificationStateEntry(long producerId, boolean createIfAbsent) { +return verificationStates.computeIfAbsent(producerId, pid -> { +if (createIfAbsent) +return new VerificationStateEntry(pid, time.milliseconds()); +else { +log.warn("The given producer ID did not have an entry in the producer state manager, so it's state will be returned as null"); Review Comment: Isn't it expected that we find no entry if we got a race condition with a concurrent abort? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1073,7 +1076,8 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short, - requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { + requestLocal: RequestLocal, + verificationState: Object): Map[TopicPartition, LogAppendResult] = { Review Comment: This should be per partition. Either this should be a Map[TopicPartition, Object] or it should be added to the entriesPerPartition map. ## storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +/** + * This class represents the verification state of a specific producer-id. + * It contains a verificationState object that is used to uniquely identify the transaction we want to verify. + * After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction + * may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions. + * We remove the verification state whenever we write data to the transaction or write an end marker for the transaction. + * Any lingering entries that are never verified are removed via the producer state entry cleanup mechanism. + */ +public class VerificationStateEntry { + +private final long producerId; +private long timestamp; +private Object verificationState; Review Comment: The purpose of this to protect atomicity of verification operation from concurrent modifications (sort of 'optimistic locking'), maybe verificationSentinel or verificationGuard or verificationTripwire would be a better name? ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1004,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we either have an ongoing transaction or verified transaction state. Review Comment: I think we need to add some d
[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load
[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515 ] Haruki Okada commented on KAFKA-15046: -- Yeah, io_uring is promising. However it only works with newer kernel (which some on-premises Kafka users may not be easy to update) and require a lot of parts of the code base. For leader-epoch cache, the checkpointing is already done in scheduler thread so we should adopt solution2 I think. Writing to CheckpointFile is already synchronized, so can't we just move checkpointing to outside of the lock? https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76 > Produce performance issue under high disk load > -- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: Haruki Okada >Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 > runnable [0x7ef9a12e2000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666) > at kafka.server.KafkaApis.handle(KafkaApis.scala:175) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} > * > ** Also there were bunch of logs that writing producer snapshots took > hundreds of milliseconds. > *** > {code:java} > ... > [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote > producer snapshot at offset 1748817854 with 8 producer ids
[jira] [Comment Edited] (KAFKA-15046) Produce performance issue under high disk load
[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515 ] Haruki Okada edited comment on KAFKA-15046 at 6/2/23 12:01 AM: --- Yeah, io_uring is promising. However it only works with newer kernel (which some on-premises Kafka users may not be easy to update) and require rewriting a lot of parts of the code base. For leader-epoch cache, the checkpointing is already done in scheduler thread so we should adopt solution2 I think. Writing to CheckpointFile is already synchronized, so can't we just move checkpointing to outside of the lock? [https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76] was (Author: ocadaruma): Yeah, io_uring is promising. However it only works with newer kernel (which some on-premises Kafka users may not be easy to update) and require a lot of parts of the code base. For leader-epoch cache, the checkpointing is already done in scheduler thread so we should adopt solution2 I think. Writing to CheckpointFile is already synchronized, so can't we just move checkpointing to outside of the lock? https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76 > Produce performance issue under high disk load > -- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.3.2 >Reporter: Haruki Okada >Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 > runnable [0x7ef9a12e2000] >java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x00060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.Repl
[jira] [Updated] (KAFKA-15048) Improve handling of unexpected quorum controller errors
[ https://issues.apache.org/jira/browse/KAFKA-15048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15048: - Summary: Improve handling of unexpected quorum controller errors (was: Improve handling of non-fatal quorum controller errors) > Improve handling of unexpected quorum controller errors > --- > > Key: KAFKA-15048 > URL: https://issues.apache.org/jira/browse/KAFKA-15048 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15048) Improve handling of non-fatal quorum controller errors
[ https://issues.apache.org/jira/browse/KAFKA-15048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15048: Assignee: Colin McCabe > Improve handling of non-fatal quorum controller errors > -- > > Key: KAFKA-15048 > URL: https://issues.apache.org/jira/browse/KAFKA-15048 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15048) Improve handling of non-fatal quorum controller errors
Colin McCabe created KAFKA-15048: Summary: Improve handling of non-fatal quorum controller errors Key: KAFKA-15048 URL: https://issues.apache.org/jira/browse/KAFKA-15048 Project: Kafka Issue Type: Bug Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
CalvinConfluent commented on code in PR #13794: URL: https://github.com/apache/kafka/pull/13794#discussion_r1213764761 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The Replayable interface. + */ +public interface Replayable { Review Comment: Do we want to name it more specifically to the coordinators? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1213757295 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -33,7 +35,8 @@ object AddPartitionsToTxnManager { class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, - val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback], + val earliestAdditionMs: Long) Review Comment: The current approach I take is that we find the max verification time for the transaction data. I can either change this or clarify in the metric. I wanted to take a per transaction level approach, but it gets a little bit harder to store the per transaction data since transaction data is an already defined collection. I could instead create a (parallel) map for each transaction if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan commented on code in PR #13798: URL: https://github.com/apache/kafka/pull/13798#discussion_r1213756543 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -85,12 +93,14 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time topicPartitionsToError.put(new TopicPartition(topic.name, partition), error) } } +verificationFailureRate.mark(topicPartitionsToError.size) topicPartitionsToError.toMap } private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { override def onComplete(response: ClientResponse): Unit = { // Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. + verificationTimeMs.update(time.milliseconds() - transactionDataAndCallbacks.earliestAdditionMs) Review Comment: The KIP originally included the phrasing `This will also account for verifications that fail before the coordinator is called.` However, on further thought, I wonder if it is better to only update the value on responses from the transaction coordinator. If not, I can also populate the metric on earlier callbacks, but it the meaning gets a little convoluted because the callback only applies to a single transaction that may not be the earliest one added for the node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…
cmccabe closed pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n… URL: https://github.com/apache/kafka/pull/13735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…
cmccabe commented on PR #13735: URL: https://github.com/apache/kafka/pull/13735#issuecomment-1572889255 committed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request, #13798: KAFKA-15028: AddPartitionsToTxnManager metrics
jolshan opened a new pull request, #13798: URL: https://github.com/apache/kafka/pull/13798 Adding the following metrics as per kip-890: VerificationTimeMs – number of milliseconds from adding partition info to the manager to the time the response is sent. This will include the round trip to the transaction coordinator if it is called. This will also account for verifications that fail before the coordinator is called. VerificationFailureRate – rate of verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15028) AddPartitionsToTxnManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-15028: -- Assignee: Justine Olshan > AddPartitionsToTxnManager metrics > - > > Key: KAFKA-15028 > URL: https://issues.apache.org/jira/browse/KAFKA-15028 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > KIP-890 added metrics for the AddPartitionsToTxnManager > VerificationTimeMs – number of milliseconds from adding partition info to the > manager to the time the response is sent. This will include the round trip to > the transaction coordinator if it is called. This will also account for > verifications that fail before the coordinator is called. > VerificationFailureRate – rate of verifications that returned in failure > either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…
cmccabe commented on code in PR #13735: URL: https://github.com/apache/kafka/pull/13735#discussion_r1213740761 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -235,4 +235,10 @@ public String toString() { builder.append(")"); return builder.toString(); } + +public boolean hasSameAssignment(PartitionRegistration registration) { Review Comment: This should be `equals`. I'll fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for window key schema. + */ +public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + +RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { +super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), +Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); +} + +@Override +protected KeyValue getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { +throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); +} + +@Override +Map getWriteBatches(final Collection> records) { Review Comment: correct. I haven't finished the restoration logic yet ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde;
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema; +import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import org.rocksdb.WriteBatch; + +/** + * A RocksDB backed time-ordered segmented bytes store for window key schema. + */ +public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore { + +RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name, + final String metricsScope, + final long retention, + final long segmentInterval, + final boolean withIndex) { +super(name, metricsScope, retention, segmentInterval, new TimeFirstWindowKeySchema(), +Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : null)); +} + +@Override +protected KeyValue getIndexKeyValue(final Bytes baseKey, final byte[] baseValue) { +throw new UnsupportedOperationException("Do not use for TimeOrderedKeyValueStore"); +} + +@Override +Map getWriteBatches(final Collection> records) { Review Comment: correct. I haven't finished the restoration logic yet (haven't got to that yet) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
wcarlson5 commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1213395601 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; +private String topic; + +public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueSegmentedBytesStore store, +final Duration gracePeriod, +final String topic) { +super(store); +this.gracePeriod = gracePeriod; +minTimestamp = Long.MAX_VALUE; +numRec = 0; +bufferSize = 0; +this.topic = topic; +} + +@SuppressWarnings("unchecked") +@Override +public void setSerdesIfNull(final SerdeGetter getter) { +keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; +valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void init(final StateStoreContext context, final StateStore root) { +wrapped().init(context, wrapped()); +} + +@Override +public void evictWhile(final Supplier predicate, final Consumer> callback) { +KeyValue keyValue = null; + +if (predicate.get()) { +final KeyValueIterator iterator = wrapped() +.fetchAll(0, wrapped().observedStreamTime - gracePeriod.toMillis()); Review Comment: Good catch, completely skipped my mind ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord
[GitHub] [kafka] jolshan merged pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jolshan merged PR #13666: URL: https://github.com/apache/kafka/pull/13666 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager
jolshan merged PR #13769: URL: https://github.com/apache/kafka/pull/13769 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager
jolshan commented on PR #13769: URL: https://github.com/apache/kafka/pull/13769#issuecomment-1572734605 Tests look unrelated. Will merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213590037 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +private class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); +if (event != null) { +try { +log.debug("Executing event: {}.", event); +event.run(); +} catch (Throwable t) { +log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); +event.complete(t); +} finally { +accumulator.done(event); +} +} +} +} + +private void drainEvents() { +CoordinatorEvent event = accumulator.poll(0, TimeUnit.MILLISECONDS); +while (event != null) { +try { +log.debug("Draining event: {}.", event); +event.complete(new RejectedExecutionException("EventProcessor is closed.")); +} catch (Throwable t) { +log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t); +} finally { +accumulator.done(event); +} + +event = accumulator.poll(0, TimeUnit.MILLISECONDS); +} +} + +@Override +public void run() { +log.info("Starting"); + +try { +handleEvents(); +} catch (Throwable t) { +log.error("Exiting with exception.", t); +} + +// The accumulator is drained and all the pending events are rejected +// when the event processor is s
[GitHub] [kafka] C0urante merged pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark
C0urante merged PR #13776: URL: https://github.com/apache/kafka/pull/13776 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark
C0urante commented on code in PR #13776: URL: https://github.com/apache/kafka/pull/13776#discussion_r1213569347 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.connect; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.ReplaceField; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +/** + * This benchmark tests the performance of the {@link ReplaceField} {@link org.apache.kafka.connect.transforms.Transformation SMT} + * when configured with a large number of include and exclude fields and applied on a {@link SourceRecord} containing a similarly + * large number of fields. + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ReplaceFieldBenchmark { + +@Param({"100", "1000", "1"}) +private int fieldCount; +private ReplaceField replaceFieldSmt; +private SourceRecord record; + +@Setup +public void setup() { +this.replaceFieldSmt = new ReplaceField.Value<>(); +Map replaceFieldConfigs = new HashMap<>(); +replaceFieldConfigs.put("exclude", +IntStream.range(0, fieldCount).filter(x -> (x & 1) == 0).mapToObj(x -> "Field-" + x).collect(Collectors.joining(","))); +replaceFieldConfigs.put("include", +IntStream.range(0, fieldCount).filter(x -> (x & 1) == 1).mapToObj(x -> "Field-" + x).collect(Collectors.joining(","))); Review Comment: Awesome, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on PR #13782: URL: https://github.com/apache/kafka/pull/13782#issuecomment-1572620634 Provided updated change: returned original try-with-resource on writing, added utility method for flushing: ``` try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { fileChannel.write(buffer); } if (scheduler != null) { scheduler.scheduleOnce("flush-producer-snapshot", () -> Utils.flushFileQuietly(file.toPath(), "producer-snapshot")); } else { Utils.flushFileQuietly(file.toPath(), "producer-snapshot"); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
jsancio commented on code in PR #13765: URL: https://github.com/apache/kafka/pull/13765#discussion_r1213501789 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition, // avoid unnecessary collection generation val leaderLogEndOffset = leaderLog.logEndOffsetMetadata var newHighWatermark = leaderLogEndOffset -remoteReplicasMap.values.foreach { replica => +remoteReplicasMap.foreachEntry { (replicaId, replica) => Review Comment: Done and Done. I confirmed that the test I added fails for the "fenced" and "shutdown" variant against trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
jolshan commented on code in PR #13794: URL: https://github.com/apache/kafka/pull/13794#discussion_r1213491922 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The Replayable interface. + */ Review Comment: nit -- could we add a bit more detail here? Maybe something about replaying records to generate state etc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
jolshan commented on PR #13794: URL: https://github.com/apache/kafka/pull/13794#issuecomment-1572526297 Let's add a brief description to explain the rationale for these interfaces to the description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13793: KAFKA-14462; [15/N] Make Result generic and rename it
dajac commented on PR #13793: URL: https://github.com/apache/kafka/pull/13793#issuecomment-1572521296 @jolshan Updated the description. This is basically a minor refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13793: KAFKA-14462; [15/N] Make Result generic and rename it
jolshan commented on PR #13793: URL: https://github.com/apache/kafka/pull/13793#issuecomment-1572514821 Can we update the description to give the rationale for this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213474996 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; + +/** + * Serializer which serializes {{@link Record}} to bytes. + */ +public class RecordSerializer implements PartitionWriter.Serializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( Review Comment: I guess I was wondering why we have a unique class here for this. It makes it seem like we would want other implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213473971 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. Review Comment: Those reasons make sense. The append path is pretty complex, so an overall refactor could do some good. But doesn't need to be a blocker here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
jolshan commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213469954 ## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ReplicaManager, RequestLocal} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( +tp: TopicPartition, +offset: Long + ): Unit = { +listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { +case other: ListenerAdaptor => listener.equals(other.listener) +case _ => false + } + + override def hashCode(): Int = { +listener.hashCode() + } + + override def toString: String = { +s"ListenerAdaptor(listener=$listener)" + } +} + +class PartitionWriterImpl[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + override def registerListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + override def deregisterListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + override def append( +tp: TopicPartition, +records: util.List[T] + ): Long = { +if (records.isEmpty) { + throw new IllegalStateException("records must be non-empty.") +} + +replicaManager.getLogConfig(tp) match { + case Some(logConfig) => +val magic = logConfig.recordVersion.value +val maxBatchSize = logConfig.maxMessageSize +val currentTimeMs = time.milliseconds() + +val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize +) + +records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { +recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS +) + } else { +throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } +} + +val appendResults = replicaManager.appendToLocalLog( Review Comment: I'm not sure if it is maybe better to include a comment or include it somehow in the name. Hard coding is fine as long as it isn't a surprise 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jolshan commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213460495 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +private class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); +if (event != null) { +try { +log.debug("Executing event: {}.", event); +event.run(); +} catch (Throwable t) { +log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); +event.complete(t); +} finally { +accumulator.done(event); +} +} +} +} + +private void drainEvents() { +CoordinatorEvent event = accumulator.poll(0, TimeUnit.MILLISECONDS); +while (event != null) { +try { +log.debug("Draining event: {}.", event); +event.complete(new RejectedExecutionException("EventProcessor is closed.")); +} catch (Throwable t) { +log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t); +} finally { +accumulator.done(event); +} + +event = accumulator.poll(0, TimeUnit.MILLISECONDS); +} +} + +@Override +public void run() { +log.info("Starting"); + +try { +handleEvents(); +} catch (Throwable t) { +log.error("Exiting with exception.", t); +} + +// The accumulator is drained and all the pending events are rejected +// when the event processor is
[GitHub] [kafka] philipnee closed pull request #13605: KAFKA-14950: implement assign() and assignment()
philipnee closed pull request #13605: KAFKA-14950: implement assign() and assignment() URL: https://github.com/apache/kafka/pull/13605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13605: KAFKA-14950: implement assign() and assignment()
philipnee commented on PR #13605: URL: https://github.com/apache/kafka/pull/13605#issuecomment-1572469830 closingi this in light of https://github.com/apache/kafka/pull/13797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #13797: KAFKA-14950: implement assign() and assignment()
philipnee opened a new pull request, #13797: URL: https://github.com/apache/kafka/pull/13797 In this PR: I implemented assign() and assignment(). Ported the original tests from the KafkaConsumerTest.java Different to the original Implementation: We will explicitly send a commit event to the background thread to commit the progress MetadataUpdate will also be send to the background thread Missing: Fetcher to clear the buffer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on code in PR #13756: URL: https://github.com/apache/kafka/pull/13756#discussion_r1212267772 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +public class RocksDBTimeOrderedKeyValueBuffer extends WrappedStateStore implements TimeOrderedKeyValueBuffer { + +private final Duration gracePeriod; +private long bufferSize; +private long minTimestamp; +private int numRec; +private Serde keySerde; +private FullChangeSerde valueSerde; Review Comment: Hmm, interesting that the serde type here is for `Change` instead of plain `V`. Looks like the reason that's the case is because `TimeOrderedKeyValueBuffer` stores `Change`. What do you think about updating `TimeOrderedKeyValueBuffer` to take a third generic type to represent the type stored in the actual buffer itself, in order to allow this new buffer implementation to simply store `V` instead? (The buffer type would be `Change` for the existing implementations, and just `V` for this new implementation.) That would simplify a lot of code in this class which doesn't have use cases for old values anyway (also evidenced by the fact that `priorValueForBuffered()` always returns null), and callers of this class won't need to pass in `null` for the old value in `Change` either. It'll also make the serialized value smaller, since we won't have to serialize in nulls for `oldValue` and `priorValue` anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager
jolshan commented on code in PR #13769: URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389903 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest { val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]() val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]() +val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]() Review Comment: We are adding transaction 1 again (retry cases) once with old epoch, once with new epoch, and again with an older epoch. I can rename and add comments to make this clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager
jolshan commented on code in PR #13769: URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389003 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## @@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest { val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]() val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]() +val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, Errors]() + // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response. addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1AgainErrorsOldEpoch)) val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap assertEquals(expectedNetworkErrors, transaction1Errors) Review Comment: I never populate those fields. We either populate the field via the callback when we return early due to the epoch/retry case or we return after receiving the response. Since this test is just checking about the addTxnData path, I didn't mock receiving a response. Because of that, the callback is never executed and the field is not populated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213267178 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. Review Comment: I did not use it here because it put stuff to the action queue and I don't want/need this in my case. The callback is also annoying. Let me circle back on this to see if can do better. I wrote this a while ago and I admit that I went with the easiest solution back then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison merged PR #13792: URL: https://github.com/apache/kafka/pull/13792 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
jsancio commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1213346871 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -33,6 +33,92 @@ public class PartitionRegistration { + +/** + * A builder class which creates a PartitionRegistration. + */ +static public class Builder { +private int[] replicas; +private int[] isr; +private int[] removingReplicas = Replicas.NONE; +private int[] addingReplicas = Replicas.NONE; +private Integer leader; +private LeaderRecoveryState leaderRecoveryState; +private Integer leaderEpoch; +private Integer partitionEpoch; + +public Builder setReplicas(int[] replicas) { +this.replicas = replicas; +return this; +} + +public Builder setIsr(int[] isr) { +this.isr = isr; +return this; +} + +public Builder setRemovingReplicas(int[] removingReplicas) { +this.removingReplicas = removingReplicas; +return this; +} + +public Builder setAddingReplicas(int[] addingReplicas) { +this.addingReplicas = addingReplicas; +return this; +} + +public Builder setLeader(Integer leader) { +this.leader = leader; +return this; +} + +public Builder setLeaderRecoveryState(LeaderRecoveryState leaderRecoveryState) { +this.leaderRecoveryState = leaderRecoveryState; +return this; +} + +public Builder setLeaderEpoch(Integer leaderEpoch) { +this.leaderEpoch = leaderEpoch; +return this; +} + +public Builder setPartitionEpoch(Integer partitionEpoch) { +this.partitionEpoch = partitionEpoch; +return this; +} + +public PartitionRegistration build() { +if (replicas == null) { +throw new IllegalStateException("You must set replicas."); +} else if (isr == null) { +throw new IllegalStateException("You must set isr."); +} else if (removingReplicas == null) { +throw new IllegalStateException("You must set removing replicas."); +} else if (addingReplicas == null) { +throw new IllegalStateException("You must set adding replicas."); +} else if (leader == null) { +throw new IllegalStateException("You must set leader."); +} else if (leaderRecoveryState == null) { +throw new IllegalStateException("You must set leader recovery state."); Review Comment: We discussed this PR offline. The change is good as is and there is not much benefits of adding addition default values to the builder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison commented on code in PR #13792: URL: https://github.com/apache/kafka/pull/13792#discussion_r1213336329 ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: + + +Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) +log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact +following the upgrade for the details on what this configuration does.) + +If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override +the inter-broker protocol version. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) + + +Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the +brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. +It is still possible to downgrade at this point if there are any problems. + +Once the cluster's behavior and performance has been verified, bump the protocol version by editing +inter.broker.protocol.version and setting it to 3.5. + +Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest +protocol version, it will no longer be possible to downgrade the cluster to an older version. + +If you have overridden the message format version as instructed above, then you need to do one more rolling restart to +upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, +change log.message.format.version to 3.5 on each broker and restart them one by one. Note that the older Scala clients, +which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs +(or to take advantage of exactly once semantics), +the newer Java clients must be used. + + + +Upgrading KRaft-based clusters +If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0. + +For a rolling upgrade: + + +Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the +brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. + +Once the cluster's behavior and performance has been verified, bump the metadata.version by running + +./bin/kafka-features.sh upgrade --metadata 3.5 + + +Note that the cluster metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded. +However, it is possible to downgrade to production versions such as 3.3-IV0, 3.3-IV1, etc. Review Comment: This is copied from the upgrade steps for 3.4. I agree this could be clarified but I'd prefer to keep it as is for now so I can backport this to 3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
jsancio commented on PR #13765: URL: https://github.com/apache/kafka/pull/13765#issuecomment-1572275711 Thanks for your feedback @divijvaidya @dajac. I am replying to both your comments in you message. > With this change, we are changing the semantics of what a leadership epoch means. Prior to this change, leadership epoch is a version number representing membership of an ISR. As soon as membership changes, this version changes. @divijvaidya, The old code was increasing the leader epoch when with the ISR shrinks but not when the ISR expands. My understanding that we were doing this because the old replica manager used leader epoch bump to invalidate old fetchers. During shutdown the fetchers needed to be invalidated to avoid having them rejoin the ISR. With KIP-841, this is no longer necessary as we can reject brokers that are shutting down from joining the ISR and modifying the HWM. Part of the code for doing this already exists, what we missing and what part of this PR fixes, is considering this state when advancing the HWM. The partition leader should not include shutting down replicas when determining the HWM. > After this change, the definition has changed to - leadership epoch is a version number that represents member of an ISR "in some cases". As you can see, the new definition has added ifs and buts to the simple definition above. Hence, I am not in favour of changing this. @divijvaidya, For correctness, the main requirement is that the leader epoch is increase whenever the leader changes. This is needed for log truncation and reconciliation. For log consistency, log truncation and reconciliation assumes that the tuples (offset, epoch) are unique per topic partition and that if the tuple (offset, epoch) match in two replicas then their log up to that offset also match. In my opinion, for correctness Kafka doesn't require that the leader epoch is increased when the ISR changes. > As you can see there, we only reset the followers' states when the leader epoch is bumped. I suppose that this is why you stumbled upon this issue with having shutting down replicas holding back advancing the HWM. The issue is that the shutting down replica's state is not reset so it remains caught-up for replica.lag.time.max.ms. I think that we need to update Partition.makeLeader to always update the followers' states. Obviously, we also need your changes to not consider fenced and shutting down replicas in the HWM computation. @dajac, Yes, I thought about this when I was implemented this feature. I decided against it because the follower (shutting down replica) is technically "caught up" to the leader we simply don't want the leader to wait for the replica when computing the HWM since we know it will soon be shutting down its fetchers. > I wonder if we really need this if we change Partition.makeLeader as explained. It seems to me that the change in Partition.makeLeader and Partition.maybeIncrementLeaderHW together should be backward compatible. What do you think? We need the MV check in the controller even with your suggestion. The question is "When is it beneficial for the controller to not increase the leader epoch when a replica is removed from the ISR because of shutdown?" This is only the case when the controller knows that the brokers have the replica manager fixes in this PR. That is guarantee to be the case if the MV is greater than the MV introduced in this PR. If the brokers don't contain the fixes in this PR and the controller doesn't bump the leader epoch, PRODUCE requests will timeout because the HWM increase will be delayed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison commented on code in PR #13792: URL: https://github.com/apache/kafka/pull/13792#discussion_r121808 ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Review Comment: I changed it to `the note in step 3 below` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison commented on code in PR #13792: URL: https://github.com/apache/kafka/pull/13792#discussion_r1213332860 ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: + + +Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) +log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact +following the upgrade for the details on what this configuration does.) + +If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override +the inter-broker protocol version. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) + + +Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the +brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. +It is still possible to downgrade at this point if there are any problems. + +Once the cluster's behavior and performance has been verified, bump the protocol version by editing +inter.broker.protocol.version and setting it to 3.5. + +Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest +protocol version, it will no longer be possible to downgrade the cluster to an older version. + +If you have overridden the message format version as instructed above, then you need to do one more rolling restart to +upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, +change log.message.format.version to 3.5 on each broker and restart them one by one. Note that the older Scala clients, +which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs +(or to take advantage of exactly once semantics), +the newer Java clients must be used. + + + +Upgrading KRaft-based clusters +If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0. Review Comment: I changed it to `the note in step 5 below` ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: + + +Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) +log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact +following the upgrade for the details on what this configuration does.) + +If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to
[jira] [Commented] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728390#comment-17728390 ] Proven Provenzano commented on KAFKA-15017: --- Fix is on 3.5 branch. > New ClientQuotas are not written to ZK from snapshot > - > > Key: KAFKA-15017 > URL: https://issues.apache.org/jira/browse/KAFKA-15017 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: David Arthur >Assignee: Proven Provenzano >Priority: Critical > > Similar issue to KAFKA-15009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Proven Provenzano resolved KAFKA-15017. --- Resolution: Fixed > New ClientQuotas are not written to ZK from snapshot > - > > Key: KAFKA-15017 > URL: https://issues.apache.org/jira/browse/KAFKA-15017 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: David Arthur >Assignee: Proven Provenzano >Priority: Critical > > Similar issue to KAFKA-15009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
andymg3 commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1213276160 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -33,6 +33,92 @@ public class PartitionRegistration { + +/** + * A builder class which creates a PartitionRegistration. + */ +static public class Builder { +private int[] replicas; +private int[] isr; +private int[] removingReplicas = Replicas.NONE; +private int[] addingReplicas = Replicas.NONE; +private Integer leader; +private LeaderRecoveryState leaderRecoveryState; +private Integer leaderEpoch; +private Integer partitionEpoch; + +public Builder setReplicas(int[] replicas) { +this.replicas = replicas; +return this; +} + +public Builder setIsr(int[] isr) { +this.isr = isr; +return this; +} + +public Builder setRemovingReplicas(int[] removingReplicas) { +this.removingReplicas = removingReplicas; +return this; +} + +public Builder setAddingReplicas(int[] addingReplicas) { +this.addingReplicas = addingReplicas; +return this; +} + +public Builder setLeader(Integer leader) { +this.leader = leader; +return this; +} + +public Builder setLeaderRecoveryState(LeaderRecoveryState leaderRecoveryState) { +this.leaderRecoveryState = leaderRecoveryState; +return this; +} + +public Builder setLeaderEpoch(Integer leaderEpoch) { +this.leaderEpoch = leaderEpoch; +return this; +} + +public Builder setPartitionEpoch(Integer partitionEpoch) { +this.partitionEpoch = partitionEpoch; +return this; +} + +public PartitionRegistration build() { +if (replicas == null) { +throw new IllegalStateException("You must set replicas."); +} else if (isr == null) { +throw new IllegalStateException("You must set isr."); +} else if (removingReplicas == null) { +throw new IllegalStateException("You must set removing replicas."); +} else if (addingReplicas == null) { +throw new IllegalStateException("You must set adding replicas."); +} else if (leader == null) { +throw new IllegalStateException("You must set leader."); +} else if (leaderRecoveryState == null) { +throw new IllegalStateException("You must set leader recovery state."); Review Comment: Should we go a step further and not require `leader`, `isr`, `leaderEpoch` and `partitionEpoch`? We could default to using the first replica in `replicas` for the leader. We could default to all replicas being in the ISR and we could default `leaderEpoch` and `partitionEpoch` to 0? It looks like we really are just calling `build` during topic creation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213274631 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; + +/** + * Serializer which serializes {{@link Record}} to bytes. + */ +public class RecordSerializer implements PartitionWriter.Serializer { +@Override +public byte[] serializeKey(Record record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( Review Comment: We use the very same code `MessageUtil.toVersionPrefixedBytes` in scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213273088 ## core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala: ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal} +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats} +import org.apache.kafka.common.utils.{MockTime, Time} +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, LogAppendInfo, LogConfig} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Test +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.charset.Charset +import java.util.{Collections, Optional, OptionalInt, Properties} +import scala.jdk.CollectionConverters._ + +class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, String)] { + override def serializeKey(record: (String, String)): Array[Byte] = { +record._1.getBytes(Charset.defaultCharset()) + } + + override def serializeValue(record: (String, String)): Array[Byte] = { +record._2.getBytes(Charset.defaultCharset()) + } +} + +class PartitionWriterImplTest { + @Test + def testRegisterDeregisterListener(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM +) + +val listener = new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = {} +} + +partitionRecordWriter.registerListener(tp, listener) +verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener)) + +partitionRecordWriter.deregisterListener(tp, listener) +verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener)) + +assertEquals( + new ListenerAdaptor(listener), + new ListenerAdaptor(listener) +) +assertEquals( + new ListenerAdaptor(listener).hashCode(), + new ListenerAdaptor(listener).hashCode() +) + } + + @Test + def testWriteRecords(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val time = new MockTime() +val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time +) + +when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() +))) + +val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) +when(replicaManager.appendToLocalLog( + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + ArgumentMatchers.eq(1), + ArgumentMatchers.eq(RequestLocal.NoCaching) +)).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo( + Optional.empty(), + 10, + OptionalInt.empty(), + RecordBatch.NO_TIMESTAMP, + -1L, + RecordBatch.NO_TIMESTAMP, + -1L, + RecordConversionStats.EMPTY, + CompressionType.NONE, + CompressionType.NONE, + -1, + -1, + false, + -1L, + Collections.emptyList(), + "", + LeaderHwChange.INCREASED + + +val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), +) + +assertEquals(11, partitionRecordWriter.append( + tp, + records.asJava +)) + +verify(replicaManag
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213271843 ## core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala: ## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal} +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats} +import org.apache.kafka.common.utils.{MockTime, Time} +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, LogAppendInfo, LogConfig} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Test +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.charset.Charset +import java.util.{Collections, Optional, OptionalInt, Properties} +import scala.jdk.CollectionConverters._ + +class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, String)] { + override def serializeKey(record: (String, String)): Array[Byte] = { +record._1.getBytes(Charset.defaultCharset()) + } + + override def serializeValue(record: (String, String)): Array[Byte] = { +record._2.getBytes(Charset.defaultCharset()) + } +} + +class PartitionWriterImplTest { + @Test + def testRegisterDeregisterListener(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM +) + +val listener = new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = {} +} + +partitionRecordWriter.registerListener(tp, listener) +verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener)) + +partitionRecordWriter.deregisterListener(tp, listener) +verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener)) + +assertEquals( + new ListenerAdaptor(listener), + new ListenerAdaptor(listener) +) +assertEquals( + new ListenerAdaptor(listener).hashCode(), + new ListenerAdaptor(listener).hashCode() +) + } + + @Test + def testWriteRecords(): Unit = { +val tp = new TopicPartition("foo", 0) +val replicaManager = mock(classOf[ReplicaManager]) +val time = new MockTime() +val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time +) + +when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() +))) + +val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) +when(replicaManager.appendToLocalLog( + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + ArgumentMatchers.eq(1), + ArgumentMatchers.eq(RequestLocal.NoCaching) +)).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo( + Optional.empty(), + 10, + OptionalInt.empty(), + RecordBatch.NO_TIMESTAMP, + -1L, + RecordBatch.NO_TIMESTAMP, + -1L, + RecordConversionStats.EMPTY, + CompressionType.NONE, + CompressionType.NONE, + -1, + -1, + false, + -1L, + Collections.emptyList(), + "", + LeaderHwChange.INCREASED + + +val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), +) + +assertEquals(11, partitionRecordWriter.append( + tp, + records.asJava +)) + +verify(replicaManag
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213268934 ## core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ReplicaManager, RequestLocal} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( +tp: TopicPartition, +offset: Long + ): Unit = { +listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { +case other: ListenerAdaptor => listener.equals(other.listener) +case _ => false + } + + override def hashCode(): Int = { +listener.hashCode() + } + + override def toString: String = { +s"ListenerAdaptor(listener=$listener)" + } +} + +class PartitionWriterImpl[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + override def registerListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + override def deregisterListener( +tp: TopicPartition, +listener: PartitionWriter.Listener + ): Unit = { +replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + override def append( +tp: TopicPartition, +records: util.List[T] + ): Long = { +if (records.isEmpty) { + throw new IllegalStateException("records must be non-empty.") +} + +replicaManager.getLogConfig(tp) match { + case Some(logConfig) => +val magic = logConfig.recordVersion.value +val maxBatchSize = logConfig.maxMessageSize +val currentTimeMs = time.milliseconds() + +val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize +) + +records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { +recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS +) + } else { +throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } +} + +val appendResults = replicaManager.appendToLocalLog( Review Comment: Yeah, I agree. The alternative would be to pass it via the method but this would mean hardcoding it somewhere else. Is it better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
urbandan commented on PR #13796: URL: https://github.com/apache/kafka/pull/13796#issuecomment-1572188562 @viktorsomogyi Could you please review this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter
dajac commented on code in PR #13675: URL: https://github.com/apache/kafka/pull/13675#discussion_r1213267178 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. Review Comment: I did not use it here because it put stuff to the action queue and I don't want/need this in my case. Let me circle back on this to see if can do better. I wrote this a while ago and I admit that I went with the easiest solution back then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan opened a new pull request, #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…
urbandan opened a new pull request, #13796: URL: https://github.com/apache/kafka/pull/13796 …atches to complete before resetting the sequence number The idempotent producer resets the sequence number when a batch fails with a non-retriable error. This can violate the idempotent guarantee if there are preceding, in-flight requests which are being retried, as their producer epoch and sequence number gets rewritten, effectively granting those messages a new "identity". Instead, the producer should wait for the preceding, retried batches to complete before resetting the sequence number. This ensures that the sequence numbers can only get reset after the preceding batches are definitely completed. By calling markSequenceUnresolved instead of requestEpochBumpForPartition, draining batches from the partition is halted, and the epoch bump is delayed until all in-flight requests are completed. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jeffkbkim commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213254844 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
jsancio commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1213240584 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -33,6 +33,92 @@ public class PartitionRegistration { + +/** + * A builder class which creates a PartitionRegistration. + */ +static public class Builder { +private int[] replicas; +private int[] isr; +private int[] removingReplicas = Replicas.NONE; +private int[] addingReplicas = Replicas.NONE; +private Integer leader; +private LeaderRecoveryState leaderRecoveryState; +private Integer leaderEpoch; +private Integer partitionEpoch; + +public Builder setReplicas(int[] replicas) { +this.replicas = replicas; +return this; +} + +public Builder setIsr(int[] isr) { +this.isr = isr; +return this; +} + +public Builder setRemovingReplicas(int[] removingReplicas) { +this.removingReplicas = removingReplicas; +return this; +} + +public Builder setAddingReplicas(int[] addingReplicas) { +this.addingReplicas = addingReplicas; +return this; +} + +public Builder setLeader(Integer leader) { +this.leader = leader; +return this; +} + +public Builder setLeaderRecoveryState(LeaderRecoveryState leaderRecoveryState) { +this.leaderRecoveryState = leaderRecoveryState; +return this; +} + +public Builder setLeaderEpoch(Integer leaderEpoch) { +this.leaderEpoch = leaderEpoch; +return this; +} + +public Builder setPartitionEpoch(Integer partitionEpoch) { +this.partitionEpoch = partitionEpoch; +return this; +} + +public PartitionRegistration build() { +if (replicas == null) { +throw new IllegalStateException("You must set replicas."); +} else if (isr == null) { +throw new IllegalStateException("You must set isr."); +} else if (removingReplicas == null) { +throw new IllegalStateException("You must set removing replicas."); +} else if (addingReplicas == null) { +throw new IllegalStateException("You must set adding replicas."); +} else if (leader == null) { +throw new IllegalStateException("You must set leader."); +} else if (leaderRecoveryState == null) { +throw new IllegalStateException("You must set leader recovery state."); Review Comment: I should have mentioned in my previous review but how about having a default of this state too? The default should be `RECOVERED`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13758: KAFKA-15010 ZK migration failover support
mumrah merged PR #13758: URL: https://github.com/apache/kafka/pull/13758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
tombentley commented on code in PR #13792: URL: https://github.com/apache/kafka/pull/13792#discussion_r1213226340 ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Review Comment: "the note below" is rather vague. Can use use an `` to link to it directly? ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: + + +Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) +log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact +following the upgrade for the details on what this configuration does.) + +If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override +the inter-broker protocol version. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.4, 3.3, etc.) + + +Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the +brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. +It is still possible to downgrade at this point if there are any problems. + +Once the cluster's behavior and performance has been verified, bump the protocol version by editing +inter.broker.protocol.version and setting it to 3.5. + +Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest +protocol version, it will no longer be possible to downgrade the cluster to an older version. + +If you have overridden the message format version as instructed above, then you need to do one more rolling restart to +upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, +change log.message.format.version to 3.5 on each broker and restart them one by one. Note that the older Scala clients, +which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs +(or to take advantage of exactly once semantics), +the newer Java clients must be used. + + + +Upgrading KRaft-based clusters +If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0. Review Comment: "the note below" is rather vague. Can use use an `` to link to it directly? ## docs/upgrade.html: ## @@ -59,6 +59,65 @@ Notable changes in 3 +Upgrading ZooKeeper-based clusters +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: + + +Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.b
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213232889 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: It should not but it could. Logging twice in this case is not an issue, I think. I would keep it as it is. beginShutdown is not necessarily called via close. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213222820 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The base event type used by all events processed in the + * coordinator runtime. + */ +public interface CoordinatorEvent extends EventAccumulator.Event { +/** Review Comment: I think that we do both in the code base but I can add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jeffkbkim commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213221009 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: can MultiThreadedEventProcessor#close() be called multiple times? we also have logging there as well. if beginShutdown is only called through close() then maybe we can synchronize there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jeffkbkim commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213191767 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The base event type used by all events processed in the + * coordinator runtime. + */ +public interface CoordinatorEvent extends EventAccumulator.Event { +/** Review Comment: nit: should we have a newline here? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(value = 60) +public class MultiThreadedEventProcessorTest { + +private static class FutureEvent implements CoordinatorEvent { +private final int key; +private final CompletableFuture future; +private final Supplier supplier; + +FutureEvent( +int key, +Supplier supplier +) { +this.key = key; +this.future = new CompletableFuture<>(); +this.supplier = supplier; +} + +@Override +public void run() { +future.complete(supplier.get()); +} + +@Override +public void complete(Throwable ex) { +future.completeExceptionally(ex); +} + +@Override +public Integer key() { +return key; +} + +public CompletableFuture future() { +return future; +} + +@Override +public String toString() { +return "FutureEvent(key=" + key + ")"; +} +} + +@Test +public void testCreateAndClose() throws Exception { +CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( +new LogContext(), +"event-processor-", +2 +); +eventProcessor.close(); +} + +@Test +public void testEventsAreProcessed() throws Exception { Review Comment: thanks, looks good! ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213213734 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: Removed the lock. I have put `synchronized` to `beginShutdown` instead. This is needed because `shuttingDown` is accessed twice for the logging message as you pointed out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager
dajac commented on code in PR #13769: URL: https://github.com/apache/kafka/pull/13769#discussion_r1213186879 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransac val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) -class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { - + private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() - + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { -val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH -else - Errors.NETWORK_EXCEPTION -val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() -currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => -topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { +if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { Review Comment: nit: `()` can be omitted. There are a few other case below. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransac val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) -class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { - + private val inflightNodes = mutable.HashSet[Node]() private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() - + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exceptio
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
jeffkbkim commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213190623 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: is it more on logging multiple times? i don't see any harm in calling accumulator.close multiple times. also on MultiThreadedEventProcessor#close() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213165026 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread( +threadPrefix + threadId +) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends Thread { +private final Logger log; + +EventProcessorThread( +String name +) { +super(name); +log = new LogContext("[" + name + "]: ").logger(EventProcessorThread.class); +setDaemon(false); +} + +private void handleEvents() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); Review Comment: We need a lock in shutdown because we access multiple variables and we don't want to trigger the shutdown multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1213163941 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import java.util.concurrent.RejectedExecutionException; + +/** + * A {{@link CoordinatorEvent}} processor. + */ +public interface CoordinatorEventProcessor extends AutoCloseable { Review Comment: Yep. I use a different implementation in unit tests (without threads) and I plan to use another one for simulation tests at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15047) Handle rolling segments when the active segment's retention is breached incase of tiered storage is enabled.
[ https://issues.apache.org/jira/browse/KAFKA-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15047: --- Description: Active segments are not copied by remote storage subsystem. But they can be eligible for retention cleanup. So, we need to roll the active segment incase remote storage is enabled so that this can be copied by the remote storage subsystem and eventually picked up for retention cleanup. > Handle rolling segments when the active segment's retention is breached > incase of tiered storage is enabled. > > > Key: KAFKA-15047 > URL: https://issues.apache.org/jira/browse/KAFKA-15047 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > > Active segments are not copied by remote storage subsystem. But they can be > eligible for retention cleanup. > So, we need to roll the active segment incase remote storage is enabled so > that this can be copied by the remote storage subsystem and eventually picked > up for retention cleanup. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version
novosibman commented on PR #13782: URL: https://github.com/apache/kafka/pull/13782#issuecomment-1572027303 > Are all the graphs shared for OMB and Kafka Tussle generated for Kafka with the fix in this PR? Graphs with the fix noted in first description comment - marked with `kafka_2.13-3.6.0-snapshot-fix` label. Other graphs in latter comment are examples of how rolling affects results on different configurations and benchmarks using regular Kafka release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1572025066 > No, you are correct, I had another look at the neighbouring code after I wrote my previous comment and I agree that this appears to not cause any leadership movements. Okay, I am happy to give my approval. Can you rebase on the latest trunk so test can be reran? Hi, I have merged latest trunk into the PR branch and re-ran unit test. It seems that the failure has nothing to do with me. @clolov -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime
dajac opened a new pull request, #13795: URL: https://github.com/apache/kafka/pull/13795 This PR includes temporary https://github.com/apache/kafka/pull/13666, https://github.com/apache/kafka/pull/13675, https://github.com/apache/kafka/pull/13793 and https://github.com/apache/kafka/pull/13794. This patch introduces the CoordinatorRuntime. The CoordinatorRuntime is a framework which encapsulate all the common features requires to build a coordinator such as the group coordinator. Please refer to the javadoc of that class for the details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213113458 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1383,20 +1421,30 @@ class UnifiedLog(@volatile var logStartOffset: Long, } } + private[log] def localRetentionMs: Long = { +if (config.remoteLogConfig.remoteStorageEnable) config.remoteLogConfig.localRetentionMs else config.retentionMs + } + private def deleteRetentionMsBreachedSegments(): Int = { -if (config.retentionMs < 0) return 0 +val retentionMs = localRetentionMs Review Comment: This is addressed with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213112946 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long, locally { initializePartitionMetadata() updateLogStartOffset(logStartOffset) +updateLocalLogStartOffset(math.max(logStartOffset, localLog.segments.firstSegmentBaseOffset.getOrElse(0L))) +if (!remoteLogEnabled()) + logStartOffset = localLogStartOffset() Review Comment: We already set the localLogStartOffset as max of passed logStartOffset and the first segment's base offset. When remote log is not enabled, `logStartOffset` is set as `localLogStartOffset` as computed above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces
dajac opened a new pull request, #13794: URL: https://github.com/apache/kafka/pull/13794 As the title says. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13793: KAFKA-14462; [15/N] Make Result generic and rename it
dajac opened a new pull request, #13793: URL: https://github.com/apache/kafka/pull/13793 As the title says. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107956 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the Review Comment: Updated the comment to make it more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107255 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213102668 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
satishd commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1213100593 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", +metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()); +} + +// No need to update the log-start-offset as these epochs/offsets are earlier to that value. +return isSegmentDeleted; +} + +private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate predicate) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (predicate.test(segmentMetadata)) { +// Publish delete segment started event. +remoteLogMetadataManager.updateRemoteLogSegmentMetadata( +new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), + RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); + +// Delete the segment in remote storage. + remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); + +// Publish delete segment finished event. +remoteLogMe
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1571989946 Hello David (@dajac), I was discussing this with Christo today as part of his work on the OffsetFetch API. Would you like this PR on OffsetCommit to be split to make the review easier and reduce risks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15047) Handle rolling segments when the active segment's retention is breached incase of tiered storage is enabled.
Satish Duggana created KAFKA-15047: -- Summary: Handle rolling segments when the active segment's retention is breached incase of tiered storage is enabled. Key: KAFKA-15047 URL: https://issues.apache.org/jira/browse/KAFKA-15047 Project: Kafka Issue Type: Sub-task Reporter: Satish Duggana Assignee: Kamal Chandraprakash -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
gr-m-9 commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1212990392 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -447,6 +482,19 @@ public void close() { @Override public void close(Duration timeout) { +if (timeout.toMillis() < 0) +throw new IllegalArgumentException("The timeout cannot be negative."); +try { +if (!closed) { +close(timeout, false); +} +} finally { +closed = true; Review Comment: could you explain why closed is boolean but shouldWakeup is AtomicBoolean ? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, final Deserializer valueDeserializer) { this.time = Time.SYSTEM; GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, -GroupRebalanceConfig.ProtocolType.CONSUMER); +GroupRebalanceConfig.ProtocolType.CONSUMER); this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); // If group.instance.id is set, we will append it to the log context. if (groupRebalanceConfig.groupInstanceId.isPresent()) { logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + -", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); +", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); Review Comment: creation such of instances shall be done in separate methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
andymg3 commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1212980561 ## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ## @@ -77,39 +77,75 @@ public void testChangeRecordIsNoOp() { ); } -private final static PartitionRegistration FOO = new PartitionRegistration( -new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, -1, LeaderRecoveryState.RECOVERED, 100, 200); +private static final PartitionRegistration FOO; + +static { +try { +FOO = new PartitionRegistration.Builder(). +setReplicas(new int[] {2, 1, 3}). +setIsr(new int[] {2, 1, 3}). +setRemovingReplicas(Replicas.NONE). +setAddingReplicas(Replicas.NONE). Review Comment: Done here and all the other places. ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -33,6 +33,92 @@ public class PartitionRegistration { + +/** + * A builder class which creates a PartitionRegistration. + */ +static public class Builder { +private int[] replicas; +private int[] isr; +private int[] removingReplicas; +private int[] addingReplicas; +private Integer leader; +private LeaderRecoveryState leaderRecoveryState; +private Integer leaderEpoch; +private Integer partitionEpoch; + +public Builder setReplicas(int[] replicas) { +this.replicas = replicas; +return this; +} + +public Builder setIsr(int[] isr) { +this.isr = isr; +return this; +} + +public Builder setRemovingReplicas(int[] removingReplicas) { +this.removingReplicas = removingReplicas; +return this; +} + +public Builder setAddingReplicas(int[] addingReplicas) { +this.addingReplicas = addingReplicas; +return this; +} + +public Builder setLeader(Integer leader) { +this.leader = leader; +return this; +} + +public Builder setLeaderRecoveryState(LeaderRecoveryState leaderRecoveryState) { +this.leaderRecoveryState = leaderRecoveryState; +return this; +} + +public Builder setLeaderEpoch(Integer leaderEpoch) { +this.leaderEpoch = leaderEpoch; +return this; +} + +public Builder setPartitionEpoch(Integer partitionEpoch) { +this.partitionEpoch = partitionEpoch; +return this; +} + +public PartitionRegistration build() throws Exception { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
andymg3 commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1212980304 ## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ## @@ -77,39 +77,75 @@ public void testChangeRecordIsNoOp() { ); } -private final static PartitionRegistration FOO = new PartitionRegistration( -new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, -1, LeaderRecoveryState.RECOVERED, 100, 200); +private static final PartitionRegistration FOO; + +static { +try { +FOO = new PartitionRegistration.Builder(). +setReplicas(new int[] {2, 1, 3}). +setIsr(new int[] {2, 1, 3}). +setRemovingReplicas(Replicas.NONE). +setAddingReplicas(Replicas.NONE). +setLeader(1). +setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). +setLeaderEpoch(100). +setPartitionEpoch(200). +build(); +} catch (Exception e) { +throw new RuntimeException(e); +} Review Comment: Yeah. I've changed the signature to not have `throws Exception` so I've removed the catches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration
andymg3 commented on code in PR #13788: URL: https://github.com/apache/kafka/pull/13788#discussion_r1212978727 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -33,6 +33,92 @@ public class PartitionRegistration { + +/** + * A builder class which creates a PartitionRegistration. + */ +static public class Builder { +private int[] replicas; +private int[] isr; +private int[] removingReplicas; +private int[] addingReplicas; +private Integer leader; +private LeaderRecoveryState leaderRecoveryState; +private Integer leaderEpoch; +private Integer partitionEpoch; + +public Builder setReplicas(int[] replicas) { +this.replicas = replicas; +return this; +} + +public Builder setIsr(int[] isr) { +this.isr = isr; +return this; +} + +public Builder setRemovingReplicas(int[] removingReplicas) { +this.removingReplicas = removingReplicas; +return this; +} + +public Builder setAddingReplicas(int[] addingReplicas) { +this.addingReplicas = addingReplicas; +return this; +} + +public Builder setLeader(Integer leader) { +this.leader = leader; +return this; +} + +public Builder setLeaderRecoveryState(LeaderRecoveryState leaderRecoveryState) { +this.leaderRecoveryState = leaderRecoveryState; +return this; +} + +public Builder setLeaderEpoch(Integer leaderEpoch) { +this.leaderEpoch = leaderEpoch; +return this; +} + +public Builder setPartitionEpoch(Integer partitionEpoch) { +this.partitionEpoch = partitionEpoch; +return this; +} + +public PartitionRegistration build() throws Exception { +if (replicas == null) { +throw new IllegalStateException("You must set replicas."); +} else if (isr == null) { +throw new IllegalStateException("You must set isr."); +} else if (removingReplicas == null) { +throw new IllegalStateException("You must set removing replicas."); +} else if (addingReplicas == null) { +throw new IllegalStateException("You must set adding replicas."); Review Comment: Done. Used `Replicas.NONE`. ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -57,7 +143,7 @@ public PartitionRegistration(PartitionRecord record) { record.partitionEpoch()); } -public PartitionRegistration(int[] replicas, int[] isr, int[] removingReplicas, +protected PartitionRegistration(int[] replicas, int[] isr, int[] removingReplicas, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison commented on PR #13792: URL: https://github.com/apache/kafka/pull/13792#issuecomment-1571810081 @tombentley @dajac Can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
gr-m-9 commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1212959825 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, final Deserializer valueDeserializer) { this.time = Time.SYSTEM; GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, -GroupRebalanceConfig.ProtocolType.CONSUMER); +GroupRebalanceConfig.ProtocolType.CONSUMER); this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); // If group.instance.id is set, we will append it to the log context. if (groupRebalanceConfig.groupInstanceId.isPresent()) { logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + -", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); +", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } else { logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } Review Comment: a bit inconsistent: this.groupId and later groupId.orElse, when there's no need to use this then it should't be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft
mimaison opened a new pull request, #13792: URL: https://github.com/apache/kafka/pull/13792 Moved the ZK and KRaft steps into `h5` sections so they are below the `h4` Upgrading to 3.5.0 from any version 0.8.x through 3.4.x section. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
gr-m-9 commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1212959825 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config, final Deserializer valueDeserializer) { this.time = Time.SYSTEM; GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, -GroupRebalanceConfig.ProtocolType.CONSUMER); +GroupRebalanceConfig.ProtocolType.CONSUMER); this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); // If group.instance.id is set, we will append it to the log context. if (groupRebalanceConfig.groupInstanceId.isPresent()) { logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + -", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); +", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } else { logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } Review Comment: a bit inconsistent: this.groupId and later groupId.orElse, when there's no need to this then it should't be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org