[GitHub] [kafka] yashmayya opened a new pull request, #13800: KAFKA-15012: Allow leading zeros in numeric fields while deserializing JSON messages using the JsonConverter

2023-06-01 Thread via GitHub


yashmayya opened a new pull request, #13800:
URL: https://github.com/apache/kafka/pull/13800

   - https://issues.apache.org/jira/browse/KAFKA-15012
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-06-01 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-15012:
--

Assignee: Yash Mayya

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Assignee: Yash Mayya
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
>   at 
> org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:322)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> o

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728551#comment-17728551
 ] 

Haruki Okada commented on KAFKA-15046:
--

[~showuon]  Maybe I linked wrong file.

What I thought is to make any LeaderEpochFileCache methods (which needs 
flush()) to be called outside of Log's global lock.

LeaderEpochFileCache already does exclusive control by its RW lock so I think 
we don't need to call it inside the Log's global lock.

[https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L44]

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] W

[jira] [Comment Edited] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515
 ] 

Haruki Okada edited comment on KAFKA-15046 at 6/2/23 4:01 AM:
--

Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require rewriting a lot of parts of the code base.

-For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think-

For leader epoch cache, some paths already doing checkpointing asynchronously 
(e.g. UnifiedLog.deleteOldSegments => UnifiedLog.maybeIncrementLogStartOffset 
=> LeaderEpochFileCache.truncateFromStart on kafka scheduler), so we have to 
make fsync called outside of the lock (i.e. solution-2) anyways I think.

 

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
[https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76]


was (Author: ocadaruma):
Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require rewriting a lot of parts of the code base.

For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think.

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
[https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76]

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptim

[GitHub] [kafka] dengziming closed pull request #13761: Reproduce KAFKA-14996, Test restart controller

2023-06-01 Thread via GitHub


dengziming closed pull request #13761: Reproduce KAFKA-14996, Test restart 
controller
URL: https://github.com/apache/kafka/pull/13761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming closed pull request #13777: KAFKA-15036: UnknownServerError on any leader failover

2023-06-01 Thread via GitHub


dengziming closed pull request #13777: KAFKA-15036: UnknownServerError on any 
leader failover
URL: https://github.com/apache/kafka/pull/13777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #13777: KAFKA-15036: UnknownServerError on any leader failover

2023-06-01 Thread via GitHub


dengziming commented on PR #13777:
URL: https://github.com/apache/kafka/pull/13777#issuecomment-1573073315

   this will be fixed in #13799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13799: KAFKA-15048: Improve handling of unexpected quorum controller errors

2023-06-01 Thread via GitHub


dengziming commented on code in PR #13799:
URL: https://github.com/apache/kafka/pull/13799#discussion_r1213858197


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -425,25 +430,24 @@ public void accept(ConfigResource configResource) {
 
 public static final String CONTROLLER_THREAD_SUFFIX = 
"QuorumControllerEventHandler";
 
-private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
-"The active controller appears to be node ";
-
-private NotControllerException newNotControllerException() {
-OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-if (latestController.isPresent()) {
-return new 
NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
-latestController.getAsInt() + ".");
-} else {
-return new NotControllerException("No controller appears to be 
active.");
-}
+private OptionalInt latestController() {
+return raftClient.leaderAndEpoch().leaderId();
 }
 
-private NotControllerException newPreMigrationException() {
-OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
-if (latestController.isPresent()) {
-return new NotControllerException("The controller is in 
pre-migration mode.");
+/**
+ * @return  The offset that we should perform read operations at.
+ */
+private long currentReadOffset() {
+if (isActiveController()) {
+// The active controller keeps an in-memory snapshot at the last 
committed offset,
+// which we want to read from when performing read operations. 
This will avoid
+// reading uncommitted data.
+return lastCommittedOffset;
 } else {
-return new NotControllerException("No controller appears to be 
active.");
+// Standby controllers never have uncommitted data in memory. 
Therefore, we return
+// Long.MAX_VALUE, a special value which means "always read the 
latest from every
+// data structure."
+return Long.MAX_VALUE;

Review Comment:
   How about using `SnapshotRegistry.LATEST_EPOCH`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728547#comment-17728547
 ] 

Luke Chen commented on KAFKA-15046:
---

I agree that io_uring is a good way to fix that, but it needs more discussion. 
And maybe we still need a fallback solution for old OS.

> Writing to CheckpointFile is already synchronized, so can't we just move 
> checkpointing to outside of the lock?

[~ocadaruma] , I don't get it. If we don't lock the write, there might be other 
threads read partial of data, or have concurrent write. Why do you think we 
don't need the lock?

 

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager parti

[GitHub] [kafka] artemlivshits commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-01 Thread via GitHub


artemlivshits commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1213645281


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -579,9 +579,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  /**
+   * Maybe create and return the verification state for the given producer ID 
if the transaction is not ongoing. Otherwise return null.
+   */
+  def transactionNeedsVerifying(producerId: Long): Object = lock synchronized {

Review Comment:
   The name implies that it's a function with no side effects, but it actually 
starts verification by installing some state.  Maybe we should use 
"maybeStartTransactionVerification" or something like that.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -184,6 +186,27 @@ private void clearProducerIds() {
 producerIdCount = 0;
 }
 
+/**
+ * Maybe create the VerificationStateEntry for a given producer ID. Return 
it if it exists, otherwise return null.
+ */
+public VerificationStateEntry verificationStateEntry(long producerId, 
boolean createIfAbsent) {
+return verificationStates.computeIfAbsent(producerId, pid -> {
+if (createIfAbsent)
+return new VerificationStateEntry(pid, time.milliseconds());
+else {
+log.warn("The given producer ID did not have an entry in the 
producer state manager, so it's state will be returned as null");

Review Comment:
   Isn't it expected that we find no entry if we got a race condition with a 
concurrent abort?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1073,7 +1076,8 @@ class ReplicaManager(val config: KafkaConfig,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
requiredAcks: Short,
-   requestLocal: RequestLocal): 
Map[TopicPartition, LogAppendResult] = {
+   requestLocal: RequestLocal,
+   verificationState: Object): Map[TopicPartition, 
LogAppendResult] = {

Review Comment:
   This should be per partition.  Either this should be a Map[TopicPartition, 
Object] or it should be added to the entriesPerPartition map.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class represents the verification state of a specific producer-id.
+ * It contains a verificationState object that is used to uniquely identify 
the transaction we want to verify.
+ * After verifying, we retain this object until we append to the log. This 
prevents any race conditions where the transaction
+ * may end via a control marker before we write to the log. This mechanism is 
used to prevent hanging transactions.
+ * We remove the verification state whenever we write data to the transaction 
or write an end marker for the transaction.
+ * Any lingering entries that are never verified are removed via the producer 
state entry cleanup mechanism.
+ */
+public class VerificationStateEntry {
+
+private final long producerId;
+private long timestamp;
+private Object verificationState;

Review Comment:
   The purpose of this to protect atomicity of verification operation from 
concurrent modifications (sort of 'optimistic locking'), maybe 
verificationSentinel or verificationGuard or verificationTripwire would be a 
better name?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1004,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.

Review Comment:
   I think we need to add some d

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515
 ] 

Haruki Okada commented on KAFKA-15046:
--

Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require a lot of parts of the code base.

For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think.

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids 

[jira] [Comment Edited] (KAFKA-15046) Produce performance issue under high disk load

2023-06-01 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728515#comment-17728515
 ] 

Haruki Okada edited comment on KAFKA-15046 at 6/2/23 12:01 AM:
---

Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require rewriting a lot of parts of the code base.

For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think.

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
[https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76]


was (Author: ocadaruma):
Yeah, io_uring is promising.

However it only works with newer kernel (which some on-premises Kafka users may 
not be easy to update) and require a lot of parts of the code base.

For leader-epoch cache, the checkpointing is already done in scheduler thread 
so we should adopt solution2 I think.

Writing to CheckpointFile is already synchronized, so can't we just move 
checkpointing to outside of the lock? 
https://github.com/apache/kafka/blob/3.3.2/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java#L76

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.Repl

[jira] [Updated] (KAFKA-15048) Improve handling of unexpected quorum controller errors

2023-06-01 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15048:
-
Summary: Improve handling of unexpected quorum controller errors  (was: 
Improve handling of non-fatal quorum controller errors)

> Improve handling of unexpected quorum controller errors
> ---
>
> Key: KAFKA-15048
> URL: https://issues.apache.org/jira/browse/KAFKA-15048
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15048) Improve handling of non-fatal quorum controller errors

2023-06-01 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15048:


Assignee: Colin McCabe

> Improve handling of non-fatal quorum controller errors
> --
>
> Key: KAFKA-15048
> URL: https://issues.apache.org/jira/browse/KAFKA-15048
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15048) Improve handling of non-fatal quorum controller errors

2023-06-01 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15048:


 Summary: Improve handling of non-fatal quorum controller errors
 Key: KAFKA-15048
 URL: https://issues.apache.org/jira/browse/KAFKA-15048
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-01 Thread via GitHub


CalvinConfluent commented on code in PR #13794:
URL: https://github.com/apache/kafka/pull/13794#discussion_r1213764761


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The Replayable interface.
+ */
+public interface Replayable {

Review Comment:
   Do we want to name it more specifically to the coordinators?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1213757295


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -33,7 +35,8 @@ object AddPartitionsToTxnManager {
 
 
 class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
-  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback],
+  val earliestAdditionMs: Long)

Review Comment:
   The current approach I take is that we find the max verification time for 
the transaction data. I can either change this or clarify in the metric.
   
   I wanted to take a per transaction level approach, but it gets a little bit 
harder to store the per transaction data since transaction data is an already 
defined collection. I could instead create a (parallel) map for each 
transaction if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1213756543


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -85,12 +93,14 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
 topicPartitionsToError.put(new TopicPartition(topic.name, partition), 
error)
   }
 }
+verificationFailureRate.mark(topicPartitionsToError.size)
 topicPartitionsToError.toMap
   }
 
   private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
 override def onComplete(response: ClientResponse): Unit = {
   // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+  verificationTimeMs.update(time.milliseconds() - 
transactionDataAndCallbacks.earliestAdditionMs)

Review Comment:
   The KIP originally included the phrasing  `This will also account for 
verifications that fail before the coordinator is called.`
   
   However, on further thought, I wonder if it is better to only update the 
value on responses from the transaction coordinator. If not, I can also 
populate the metric on earlier callbacks, but it the meaning gets a little 
convoluted because the callback only applies to a single transaction that may 
not be the earliest one added for the node. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…

2023-06-01 Thread via GitHub


cmccabe closed pull request #13735: KAFKA-15003: When partitions are partition 
assignments change we do n…
URL: https://github.com/apache/kafka/pull/13735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…

2023-06-01 Thread via GitHub


cmccabe commented on PR #13735:
URL: https://github.com/apache/kafka/pull/13735#issuecomment-1572889255

   committed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-01 Thread via GitHub


jolshan opened a new pull request, #13798:
URL: https://github.com/apache/kafka/pull/13798

   Adding the following metrics as per kip-890:
   
   VerificationTimeMs – number of milliseconds from adding partition info to 
the manager to the time the response is sent. This will include the round trip 
to the transaction coordinator if it is called. This will also account for 
verifications that fail before the coordinator is called.
   
   VerificationFailureRate – rate of verifications that returned in failure 
either from the AddPartitionsToTxn response or through errors in the manager.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-06-01 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-15028:
--

Assignee: Justine Olshan

> AddPartitionsToTxnManager metrics
> -
>
> Key: KAFKA-15028
> URL: https://issues.apache.org/jira/browse/KAFKA-15028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> KIP-890 added metrics for the AddPartitionsToTxnManager
> VerificationTimeMs – number of milliseconds from adding partition info to the 
> manager to the time the response is sent. This will include the round trip to 
> the transaction coordinator if it is called. This will also account for 
> verifications that fail before the coordinator is called.
> VerificationFailureRate – rate of verifications that returned in failure 
> either from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13735: KAFKA-15003: When partitions are partition assignments change we do n…

2023-06-01 Thread via GitHub


cmccabe commented on code in PR #13735:
URL: https://github.com/apache/kafka/pull/13735#discussion_r1213740761


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -235,4 +235,10 @@ public String toString() {
 builder.append(")");
 return builder.toString();
 }
+
+public boolean hasSameAssignment(PartitionRegistration registration) {

Review Comment:
   This should be `equals`. I'll fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-01 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.rocksdb.WriteBatch;
+
+/**
+ * A RocksDB backed time-ordered segmented bytes store for window key schema.
+ */
+public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+
+RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name,
+  final String metricsScope,
+  final long retention,
+  final long segmentInterval,
+  final boolean withIndex) {
+super(name, metricsScope, retention, segmentInterval, new 
TimeFirstWindowKeySchema(),
+Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null));
+}
+
+@Override
+protected KeyValue getIndexKeyValue(final Bytes baseKey, 
final byte[] baseValue) {
+throw new UnsupportedOperationException("Do not use for 
TimeOrderedKeyValueStore");
+}
+
+@Override
+Map getWriteBatches(final 
Collection> records) {

Review Comment:
   correct. I haven't finished the restoration logic yet



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-01 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1213589269


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.rocksdb.WriteBatch;
+
+/**
+ * A RocksDB backed time-ordered segmented bytes store for window key schema.
+ */
+public class RocksDBTimeOrderedKeyValueSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
+
+RocksDBTimeOrderedKeyValueSegmentedBytesStore(final String name,
+  final String metricsScope,
+  final long retention,
+  final long segmentInterval,
+  final boolean withIndex) {
+super(name, metricsScope, retention, segmentInterval, new 
TimeFirstWindowKeySchema(),
+Optional.ofNullable(withIndex ? new KeyFirstWindowKeySchema() : 
null));
+}
+
+@Override
+protected KeyValue getIndexKeyValue(final Bytes baseKey, 
final byte[] baseValue) {
+throw new UnsupportedOperationException("Do not use for 
TimeOrderedKeyValueStore");
+}
+
+@Override
+Map getWriteBatches(final 
Collection> records) {

Review Comment:
   correct. I haven't finished the restoration logic yet (haven't got to that 
yet)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-01 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1213395601


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod;
+minTimestamp = Long.MAX_VALUE;
+numRec = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue = null;
+
+if (predicate.get()) {
+final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - 
gracePeriod.toMillis());

Review Comment:
   Good catch, completely skipped my mind



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueSegmentedBytesStore.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord

[GitHub] [kafka] jolshan merged pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jolshan merged PR #13666:
URL: https://github.com/apache/kafka/pull/13666


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-06-01 Thread via GitHub


jolshan merged PR #13769:
URL: https://github.com/apache/kafka/pull/13769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-06-01 Thread via GitHub


jolshan commented on PR #13769:
URL: https://github.com/apache/kafka/pull/13769#issuecomment-1572734605

   Tests look unrelated. Will merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213590037


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+private class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();
+if (event != null) {
+try {
+log.debug("Executing event: {}.", event);
+event.run();
+} catch (Throwable t) {
+log.error("Failed to run event {} due to: {}.", event, 
t.getMessage(), t);
+event.complete(t);
+} finally {
+accumulator.done(event);
+}
+}
+}
+}
+
+private void drainEvents() {
+CoordinatorEvent event = accumulator.poll(0, 
TimeUnit.MILLISECONDS);
+while (event != null) {
+try {
+log.debug("Draining event: {}.", event);
+event.complete(new 
RejectedExecutionException("EventProcessor is closed."));
+} catch (Throwable t) {
+log.error("Failed to reject event {} due to: {}.", event, 
t.getMessage(), t);
+} finally {
+accumulator.done(event);
+}
+
+event = accumulator.poll(0, TimeUnit.MILLISECONDS);
+}
+}
+
+@Override
+public void run() {
+log.info("Starting");
+
+try {
+handleEvents();
+} catch (Throwable t) {
+log.error("Exiting with exception.", t);
+}
+
+// The accumulator is drained and all the pending events are 
rejected
+// when the event processor is s

[GitHub] [kafka] C0urante merged pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-06-01 Thread via GitHub


C0urante merged PR #13776:
URL: https://github.com/apache/kafka/pull/13776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-06-01 Thread via GitHub


C0urante commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1213569347


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.connect;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+/**
+ * This benchmark tests the performance of the {@link ReplaceField} {@link 
org.apache.kafka.connect.transforms.Transformation SMT}
+ * when configured with a large number of include and exclude fields and 
applied on a {@link SourceRecord} containing a similarly
+ * large number of fields.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ReplaceFieldBenchmark {
+
+@Param({"100", "1000", "1"})
+private int fieldCount;
+private ReplaceField replaceFieldSmt;
+private SourceRecord record;
+
+@Setup
+public void setup() {
+this.replaceFieldSmt = new ReplaceField.Value<>();
+Map replaceFieldConfigs = new HashMap<>();
+replaceFieldConfigs.put("exclude",
+IntStream.range(0, fieldCount).filter(x -> (x & 1) == 
0).mapToObj(x -> "Field-" + x).collect(Collectors.joining(",")));
+replaceFieldConfigs.put("include",
+IntStream.range(0, fieldCount).filter(x -> (x & 1) == 
1).mapToObj(x -> "Field-" + x).collect(Collectors.joining(",")));

Review Comment:
   Awesome, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-01 Thread via GitHub


novosibman commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1572620634

   Provided updated change:
   returned original try-with-resource on writing, added utility method for 
flushing:
   ```
   try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
   fileChannel.write(buffer);
   }
   if (scheduler != null) {
   scheduler.scheduleOnce("flush-producer-snapshot", () -> 
Utils.flushFileQuietly(file.toPath(), "producer-snapshot"));
   } else {
   Utils.flushFileQuietly(file.toPath(), "producer-snapshot");
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-01 Thread via GitHub


jsancio commented on code in PR #13765:
URL: https://github.com/apache/kafka/pull/13765#discussion_r1213501789


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition,
 // avoid unnecessary collection generation
 val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
 var newHighWatermark = leaderLogEndOffset
-remoteReplicasMap.values.foreach { replica =>
+remoteReplicasMap.foreachEntry { (replicaId, replica) =>

Review Comment:
   Done and Done. I confirmed that the test I added fails for the "fenced" and 
"shutdown" variant against trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13794:
URL: https://github.com/apache/kafka/pull/13794#discussion_r1213491922


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Replayable.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The Replayable interface.
+ */

Review Comment:
   nit -- could we add a bit more detail here? Maybe something about replaying 
records to generate state etc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-01 Thread via GitHub


jolshan commented on PR #13794:
URL: https://github.com/apache/kafka/pull/13794#issuecomment-1572526297

   Let's add a brief description to explain the rationale for these interfaces 
to the description


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13793: KAFKA-14462; [15/N] Make Result generic and rename it

2023-06-01 Thread via GitHub


dajac commented on PR #13793:
URL: https://github.com/apache/kafka/pull/13793#issuecomment-1572521296

   @jolshan Updated the description. This is basically a minor refactoring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13793: KAFKA-14462; [15/N] Make Result generic and rename it

2023-06-01 Thread via GitHub


jolshan commented on PR #13793:
URL: https://github.com/apache/kafka/pull/13793#issuecomment-1572514821

   Can we update the description to give the rationale for this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213474996


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+
+/**
+ * Serializer which serializes {{@link Record}} to bytes.
+ */
+public class RecordSerializer implements PartitionWriter.Serializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(

Review Comment:
   I guess I was wondering why we have a unique class here for this. It makes 
it seem like we would want other implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213473971


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Append the messages to the local replica logs
+   * Append the messages to the local replica logs. 
ReplicaManager#appendRecords should usually be
+   * used instead of this method.

Review Comment:
   Those reasons make sense. The append path is pretty complex, so an overall 
refactor could do some good. But doesn't need to be a blocker here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213469954


##
core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.cluster.PartitionListener
+import kafka.server.{ReplicaManager, RequestLocal}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
TimestampType}
+import org.apache.kafka.common.record.Record.EMPTY_HEADERS
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.AppendOrigin
+
+import java.nio.ByteBuffer
+import java.util
+import scala.collection.Map
+
+private[group] class ListenerAdaptor(
+  val listener: PartitionWriter.Listener
+) extends PartitionListener {
+  override def onHighWatermarkUpdated(
+tp: TopicPartition,
+offset: Long
+  ): Unit = {
+listener.onHighWatermarkUpdated(tp, offset)
+  }
+
+  override def equals(that: Any): Boolean = that match {
+case other: ListenerAdaptor => listener.equals(other.listener)
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+listener.hashCode()
+  }
+
+  override def toString: String = {
+s"ListenerAdaptor(listener=$listener)"
+  }
+}
+
+class PartitionWriterImpl[T](
+  replicaManager: ReplicaManager,
+  serializer: PartitionWriter.Serializer[T],
+  compressionType: CompressionType,
+  time: Time
+) extends PartitionWriter[T] {
+
+  override def registerListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def deregisterListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.removeListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def append(
+tp: TopicPartition,
+records: util.List[T]
+  ): Long = {
+if (records.isEmpty) {
+  throw new IllegalStateException("records must be non-empty.")
+}
+
+replicaManager.getLogConfig(tp) match {
+  case Some(logConfig) =>
+val magic = logConfig.recordVersion.value
+val maxBatchSize = logConfig.maxMessageSize
+val currentTimeMs = time.milliseconds()
+
+val recordsBuilder = MemoryRecords.builder(
+  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+  magic,
+  compressionType,
+  TimestampType.CREATE_TIME,
+  0L,
+  maxBatchSize
+)
+
+records.forEach { record =>
+  val keyBytes = serializer.serializeKey(record)
+  val valueBytes = serializer.serializeValue(record)
+
+  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) {
+recordsBuilder.append(
+  currentTimeMs,
+  keyBytes,
+  valueBytes,
+  EMPTY_HEADERS
+)
+  } else {
+throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
+  s"in append to partition $tp which exceeds the maximum 
configured size of $maxBatchSize.")
+  }
+}
+
+val appendResults = replicaManager.appendToLocalLog(

Review Comment:
   I'm not sure if it is maybe better to include a comment or include it 
somehow in the name. Hard coding is fine as long as it isn't a surprise 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213460495


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+private class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();
+if (event != null) {
+try {
+log.debug("Executing event: {}.", event);
+event.run();
+} catch (Throwable t) {
+log.error("Failed to run event {} due to: {}.", event, 
t.getMessage(), t);
+event.complete(t);
+} finally {
+accumulator.done(event);
+}
+}
+}
+}
+
+private void drainEvents() {
+CoordinatorEvent event = accumulator.poll(0, 
TimeUnit.MILLISECONDS);
+while (event != null) {
+try {
+log.debug("Draining event: {}.", event);
+event.complete(new 
RejectedExecutionException("EventProcessor is closed."));
+} catch (Throwable t) {
+log.error("Failed to reject event {} due to: {}.", event, 
t.getMessage(), t);
+} finally {
+accumulator.done(event);
+}
+
+event = accumulator.poll(0, TimeUnit.MILLISECONDS);
+}
+}
+
+@Override
+public void run() {
+log.info("Starting");
+
+try {
+handleEvents();
+} catch (Throwable t) {
+log.error("Exiting with exception.", t);
+}
+
+// The accumulator is drained and all the pending events are 
rejected
+// when the event processor is

[GitHub] [kafka] philipnee closed pull request #13605: KAFKA-14950: implement assign() and assignment()

2023-06-01 Thread via GitHub


philipnee closed pull request #13605: KAFKA-14950: implement assign() and 
assignment()
URL: https://github.com/apache/kafka/pull/13605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13605: KAFKA-14950: implement assign() and assignment()

2023-06-01 Thread via GitHub


philipnee commented on PR #13605:
URL: https://github.com/apache/kafka/pull/13605#issuecomment-1572469830

   closingi this in light of https://github.com/apache/kafka/pull/13797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13797: KAFKA-14950: implement assign() and assignment()

2023-06-01 Thread via GitHub


philipnee opened a new pull request, #13797:
URL: https://github.com/apache/kafka/pull/13797

   In this PR: I implemented assign() and assignment(). Ported the original 
tests from the KafkaConsumerTest.java
   
   Different to the original Implementation:
   
   We will explicitly send a commit event to the background thread to commit 
the progress
   MetadataUpdate will also be send to the background thread
   Missing:
   
   Fetcher to clear the buffer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-01 Thread via GitHub


vcrfxia commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1212267772


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

Review Comment:
   Hmm, interesting that the serde type here is for `Change` instead of 
plain `V`. Looks like the reason that's the case is because 
`TimeOrderedKeyValueBuffer` stores `Change`. What do you think about 
updating `TimeOrderedKeyValueBuffer` to take a third generic type to represent 
the type stored in the actual buffer itself, in order to allow this new buffer 
implementation to simply store `V` instead? (The buffer type would be 
`Change` for the existing implementations, and just `V` for this new 
implementation.)
   
   That would simplify a lot of code in this class which doesn't have use cases 
for old values anyway (also evidenced by the fact that 
`priorValueForBuffered()` always returns null), and callers of this class won't 
need to pass in `null` for the old value in `Change` either. It'll also make 
the serialized value smaller, since we won't have to serialize in nulls for 
`oldValue` and `priorValue` anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389903


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
 val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
 val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, 
Errors]()

Review Comment:
   We are adding transaction 1 again (retry cases) once with old epoch, once 
with new epoch, and again with an older epoch.
   
   I can rename and add comments to make this clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-06-01 Thread via GitHub


jolshan commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213389003


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##
@@ -93,17 +93,22 @@ class AddPartitionsToTxnManagerTest {
 
 val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]()
 val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]()
+val transaction1AgainErrorsOldEpochAgain = mutable.Map[TopicPartition, 
Errors]()
+
 // Trying to add more transactional data for the same transactional ID, 
producer ID, and epoch should simply replace the old data and send a retriable 
response.
 addPartitionsToTxnManager.addTxnData(node0, 
transactionData(transactionalId1, producerId1), 
setErrors(transaction1AgainErrorsOldEpoch))
 val expectedNetworkErrors = topicPartitions.map(_ -> 
Errors.NETWORK_EXCEPTION).toMap
 assertEquals(expectedNetworkErrors, transaction1Errors)

Review Comment:
   I never populate those fields. 
   We either populate the field via the callback when we return early due to 
the epoch/retry case or we return after receiving the response. Since this test 
is just checking about the addTxnData path, I didn't mock receiving a response. 
Because of that, the callback is never executed and the field is not populated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213267178


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Append the messages to the local replica logs
+   * Append the messages to the local replica logs. 
ReplicaManager#appendRecords should usually be
+   * used instead of this method.

Review Comment:
   I did not use it here because it put stuff to the action queue and I don't 
want/need this in my case. The callback is also annoying. Let me circle back on 
this to see if can do better. I wrote this a while ago and I admit that I went 
with the easiest solution back then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison merged PR #13792:
URL: https://github.com/apache/kafka/pull/13792


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


jsancio commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1213346871


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -33,6 +33,92 @@
 
 
 public class PartitionRegistration {
+
+/**
+ * A builder class which creates a PartitionRegistration.
+ */
+static public class Builder {
+private int[] replicas;
+private int[] isr;
+private int[] removingReplicas = Replicas.NONE;
+private int[] addingReplicas = Replicas.NONE;
+private Integer leader;
+private LeaderRecoveryState leaderRecoveryState;
+private Integer leaderEpoch;
+private Integer partitionEpoch;
+
+public Builder setReplicas(int[] replicas) {
+this.replicas = replicas;
+return this;
+}
+
+public Builder setIsr(int[] isr) {
+this.isr = isr;
+return this;
+}
+
+public Builder setRemovingReplicas(int[] removingReplicas) {
+this.removingReplicas = removingReplicas;
+return this;
+}
+
+public Builder setAddingReplicas(int[] addingReplicas) {
+this.addingReplicas = addingReplicas;
+return this;
+}
+
+public Builder setLeader(Integer leader) {
+this.leader = leader;
+return this;
+}
+
+public Builder setLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+this.leaderRecoveryState = leaderRecoveryState;
+return this;
+}
+
+public Builder setLeaderEpoch(Integer leaderEpoch) {
+this.leaderEpoch = leaderEpoch;
+return this;
+}
+
+public Builder setPartitionEpoch(Integer partitionEpoch) {
+this.partitionEpoch = partitionEpoch;
+return this;
+}
+
+public PartitionRegistration build() {
+if (replicas == null) {
+throw new IllegalStateException("You must set replicas.");
+} else if (isr == null) {
+throw new IllegalStateException("You must set isr.");
+} else if (removingReplicas == null) {
+throw new IllegalStateException("You must set removing 
replicas.");
+} else if (addingReplicas == null) {
+throw new IllegalStateException("You must set adding 
replicas.");
+} else if (leader == null) {
+throw new IllegalStateException("You must set leader.");
+} else if (leaderRecoveryState == null) {
+throw new IllegalStateException("You must set leader recovery 
state.");

Review Comment:
   We discussed this PR offline. The change is good as is and there is not much 
benefits of adding addition default values to the builder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison commented on code in PR #13792:
URL: https://github.com/apache/kafka/pull/13792#discussion_r1213336329


##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
+
+
+Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See potential performance impact
+following the upgrade for the details on what this 
configuration does.)
+
+If you are upgrading from version 0.11.0.x or above, and you have 
not overridden the message format, then you only need to override
+the inter-broker protocol version.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+
+
+Upgrade the brokers one at a time: shut down the broker, update 
the code, and restart it. Once you have done so, the
+brokers will be running the latest version and you can verify that 
the cluster's behavior and performance meets expectations.
+It is still possible to downgrade at this point if there are any 
problems.
+
+Once the cluster's behavior and performance has been verified, 
bump the protocol version by editing
+inter.broker.protocol.version and setting it to 
3.5.
+
+Restart the brokers one by one for the new protocol version to 
take effect. Once the brokers begin using the latest
+protocol version, it will no longer be possible to downgrade the 
cluster to an older version.
+
+If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
+upgrade it to its latest version. Once all (or most) consumers 
have been upgraded to 0.11.0 or later,
+change log.message.format.version to 3.5 on each broker and 
restart them one by one. Note that the older Scala clients,
+which are no longer maintained, do not support the message format 
introduced in 0.11, so to avoid conversion costs
+(or to take advantage of exactly once semantics),
+the newer Java clients must be used.
+
+
+
+Upgrading 
KRaft-based clusters
+If you are upgrading from a version prior to 3.3.0, please see the 
note below. Once you have changed the metadata.version to the latest version, 
it will not be possible to downgrade to a version prior to 3.3-IV0.
+
+For a rolling upgrade:
+
+
+Upgrade the brokers one at a time: shut down the broker, update 
the code, and restart it. Once you have done so, the
+brokers will be running the latest version and you can verify that 
the cluster's behavior and performance meets expectations.
+
+Once the cluster's behavior and performance has been verified, 
bump the metadata.version by running
+
+./bin/kafka-features.sh upgrade --metadata 3.5
+
+
+Note that the cluster metadata version cannot be downgraded to a 
pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded.
+However, it is possible to downgrade to production versions such 
as 3.3-IV0, 3.3-IV1, etc.

Review Comment:
   This is copied from the upgrade steps for 3.4. I agree this could be 
clarified but I'd prefer to keep it as is for now so I can backport this to 
3.5. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-06-01 Thread via GitHub


jsancio commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1572275711

   Thanks for your feedback @divijvaidya @dajac. I am replying to both your 
comments in you message.
   > With this change, we are changing the semantics of what a leadership epoch 
means. Prior to this change, leadership epoch is a version number representing 
membership of an ISR. As soon as membership changes, this version changes.
   
   @divijvaidya, The old code was increasing the leader epoch when with the ISR 
shrinks but not when the ISR expands. My understanding that we were doing this 
because the old replica manager used leader epoch bump to invalidate old 
fetchers. During shutdown the fetchers needed to be invalidated to avoid having 
them rejoin the ISR. With KIP-841, this is no longer necessary as we can reject 
brokers that are shutting down from joining the ISR and modifying the HWM.
   
   Part of the code for doing this already exists, what we missing and what 
part of this PR fixes, is considering this state when advancing the HWM. The 
partition leader should not include shutting down replicas when determining the 
HWM.
   
   > After this change, the definition has changed to - leadership epoch is a 
version number that represents member of an ISR "in some cases". As you can 
see, the new definition has added ifs and buts to the simple definition above. 
Hence, I am not in favour of changing this.
   
   @divijvaidya, For correctness, the main requirement is that the leader epoch 
is increase whenever the leader changes. This is needed for log truncation and 
reconciliation. For log consistency, log truncation and reconciliation assumes 
that the tuples (offset, epoch) are unique per topic partition and that if the 
tuple (offset, epoch) match in two replicas then their log up to that offset 
also match. In my opinion, for correctness Kafka doesn't require that the 
leader epoch is increased when the ISR changes.
   
   > As you can see there, we only reset the followers' states when the leader 
epoch is bumped. I suppose that this is why you stumbled upon this issue with 
having shutting down replicas holding back advancing the HWM. The issue is that 
the shutting down replica's state is not reset so it remains caught-up for 
replica.lag.time.max.ms. I think that we need to update Partition.makeLeader to 
always update the followers' states. Obviously, we also need your changes to 
not consider fenced and shutting down replicas in the HWM computation.
   
   @dajac, Yes, I thought about this when I was implemented this feature. I 
decided against it because the follower (shutting down replica) is technically 
"caught up" to the leader we simply don't want the leader to wait for the 
replica when computing the HWM since we know it will soon be shutting down its 
fetchers.
   
   > I wonder if we really need this if we change Partition.makeLeader as 
explained. It seems to me that the change in Partition.makeLeader and 
Partition.maybeIncrementLeaderHW together should be backward compatible. What 
do you think?
   
   We need the MV check in the controller even with your suggestion. The 
question is "When is it beneficial for the controller to not increase the 
leader epoch when a replica is removed from the ISR because of shutdown?" This 
is only the case when the controller knows that the brokers have the replica 
manager fixes in this PR. That is guarantee to be the case if the MV is greater 
than the MV introduced in this PR.
   
   If the brokers don't contain the fixes in this PR and the controller doesn't 
bump the leader epoch, PRODUCE requests will timeout because the HWM increase 
will be delayed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison commented on code in PR #13792:
URL: https://github.com/apache/kafka/pull/13792#discussion_r121808


##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.

Review Comment:
   I changed it to `the note in step 3 below`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison commented on code in PR #13792:
URL: https://github.com/apache/kafka/pull/13792#discussion_r1213332860


##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
+
+
+Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See potential performance impact
+following the upgrade for the details on what this 
configuration does.)
+
+If you are upgrading from version 0.11.0.x or above, and you have 
not overridden the message format, then you only need to override
+the inter-broker protocol version.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+
+
+Upgrade the brokers one at a time: shut down the broker, update 
the code, and restart it. Once you have done so, the
+brokers will be running the latest version and you can verify that 
the cluster's behavior and performance meets expectations.
+It is still possible to downgrade at this point if there are any 
problems.
+
+Once the cluster's behavior and performance has been verified, 
bump the protocol version by editing
+inter.broker.protocol.version and setting it to 
3.5.
+
+Restart the brokers one by one for the new protocol version to 
take effect. Once the brokers begin using the latest
+protocol version, it will no longer be possible to downgrade the 
cluster to an older version.
+
+If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
+upgrade it to its latest version. Once all (or most) consumers 
have been upgraded to 0.11.0 or later,
+change log.message.format.version to 3.5 on each broker and 
restart them one by one. Note that the older Scala clients,
+which are no longer maintained, do not support the message format 
introduced in 0.11, so to avoid conversion costs
+(or to take advantage of exactly once semantics),
+the newer Java clients must be used.
+
+
+
+Upgrading 
KRaft-based clusters
+If you are upgrading from a version prior to 3.3.0, please see the 
note below. Once you have changed the metadata.version to the latest version, 
it will not be possible to downgrade to a version prior to 3.3-IV0.

Review Comment:
   I changed it to `the note in step 5 below`



##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
+
+
+Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See potential performance impact
+following the upgrade for the details on what this 
configuration does.)
+
+If you are upgrading from version 0.11.0.x or above, and you have 
not overridden the message format, then you only need to

[jira] [Commented] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot

2023-06-01 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728390#comment-17728390
 ] 

Proven Provenzano commented on KAFKA-15017:
---

Fix is on 3.5 branch.

> New ClientQuotas are not written to ZK from snapshot 
> -
>
> Key: KAFKA-15017
> URL: https://issues.apache.org/jira/browse/KAFKA-15017
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: David Arthur
>Assignee: Proven Provenzano
>Priority: Critical
>
> Similar issue to KAFKA-15009



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15017) New ClientQuotas are not written to ZK from snapshot

2023-06-01 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-15017.
---
Resolution: Fixed

> New ClientQuotas are not written to ZK from snapshot 
> -
>
> Key: KAFKA-15017
> URL: https://issues.apache.org/jira/browse/KAFKA-15017
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: David Arthur
>Assignee: Proven Provenzano
>Priority: Critical
>
> Similar issue to KAFKA-15009



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


andymg3 commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1213276160


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -33,6 +33,92 @@
 
 
 public class PartitionRegistration {
+
+/**
+ * A builder class which creates a PartitionRegistration.
+ */
+static public class Builder {
+private int[] replicas;
+private int[] isr;
+private int[] removingReplicas = Replicas.NONE;
+private int[] addingReplicas = Replicas.NONE;
+private Integer leader;
+private LeaderRecoveryState leaderRecoveryState;
+private Integer leaderEpoch;
+private Integer partitionEpoch;
+
+public Builder setReplicas(int[] replicas) {
+this.replicas = replicas;
+return this;
+}
+
+public Builder setIsr(int[] isr) {
+this.isr = isr;
+return this;
+}
+
+public Builder setRemovingReplicas(int[] removingReplicas) {
+this.removingReplicas = removingReplicas;
+return this;
+}
+
+public Builder setAddingReplicas(int[] addingReplicas) {
+this.addingReplicas = addingReplicas;
+return this;
+}
+
+public Builder setLeader(Integer leader) {
+this.leader = leader;
+return this;
+}
+
+public Builder setLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+this.leaderRecoveryState = leaderRecoveryState;
+return this;
+}
+
+public Builder setLeaderEpoch(Integer leaderEpoch) {
+this.leaderEpoch = leaderEpoch;
+return this;
+}
+
+public Builder setPartitionEpoch(Integer partitionEpoch) {
+this.partitionEpoch = partitionEpoch;
+return this;
+}
+
+public PartitionRegistration build() {
+if (replicas == null) {
+throw new IllegalStateException("You must set replicas.");
+} else if (isr == null) {
+throw new IllegalStateException("You must set isr.");
+} else if (removingReplicas == null) {
+throw new IllegalStateException("You must set removing 
replicas.");
+} else if (addingReplicas == null) {
+throw new IllegalStateException("You must set adding 
replicas.");
+} else if (leader == null) {
+throw new IllegalStateException("You must set leader.");
+} else if (leaderRecoveryState == null) {
+throw new IllegalStateException("You must set leader recovery 
state.");

Review Comment:
   Should we go a step further and not require `leader`, `isr`, `leaderEpoch` 
and `partitionEpoch`? 
   
   We could default to using the first replica in `replicas` for the leader. We 
could default to all replicas being in the ISR and we could default 
`leaderEpoch` and `partitionEpoch` to 0? It looks like we really are just 
calling `build` during topic creation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213274631


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+
+/**
+ * Serializer which serializes {{@link Record}} to bytes.
+ */
+public class RecordSerializer implements PartitionWriter.Serializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(

Review Comment:
   We use the very same code `MessageUtil.toVersionPrefixedBytes` in scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213273088


##
core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala:
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, 
RecordTooLargeException}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, RecordConversionStats}
+import org.apache.kafka.common.utils.{MockTime, Time}
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, 
LogAppendInfo, LogConfig}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Test
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.charset.Charset
+import java.util.{Collections, Optional, OptionalInt, Properties}
+import scala.jdk.CollectionConverters._
+
+class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, 
String)] {
+  override def serializeKey(record: (String, String)): Array[Byte] = {
+record._1.getBytes(Charset.defaultCharset())
+  }
+
+  override def serializeValue(record: (String, String)): Array[Byte] = {
+record._2.getBytes(Charset.defaultCharset())
+  }
+}
+
+class PartitionWriterImplTest {
+  @Test
+  def testRegisterDeregisterListener(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val partitionRecordWriter = new PartitionWriterImpl(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  Time.SYSTEM
+)
+
+val listener = new PartitionWriter.Listener {
+  override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): 
Unit = {}
+}
+
+partitionRecordWriter.registerListener(tp, listener)
+verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener))
+
+partitionRecordWriter.deregisterListener(tp, listener)
+verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener))
+
+assertEquals(
+  new ListenerAdaptor(listener),
+  new ListenerAdaptor(listener)
+)
+assertEquals(
+  new ListenerAdaptor(listener).hashCode(),
+  new ListenerAdaptor(listener).hashCode()
+)
+  }
+
+  @Test
+  def testWriteRecords(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val time = new MockTime()
+val partitionRecordWriter = new PartitionWriterImpl(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  time
+)
+
+when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+  Collections.emptyMap(),
+  new Properties()
+)))
+
+val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+when(replicaManager.appendToLocalLog(
+  ArgumentMatchers.eq(true),
+  ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+  recordsCapture.capture(),
+  ArgumentMatchers.eq(1),
+  ArgumentMatchers.eq(RequestLocal.NoCaching)
+)).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo(
+  Optional.empty(),
+  10,
+  OptionalInt.empty(),
+  RecordBatch.NO_TIMESTAMP,
+  -1L,
+  RecordBatch.NO_TIMESTAMP,
+  -1L,
+  RecordConversionStats.EMPTY,
+  CompressionType.NONE,
+  CompressionType.NONE,
+  -1,
+  -1,
+  false,
+  -1L,
+  Collections.emptyList(),
+  "",
+  LeaderHwChange.INCREASED
+
+
+val records = List(
+  ("k0", "v0"),
+  ("k1", "v1"),
+  ("k2", "v2"),
+)
+
+assertEquals(11, partitionRecordWriter.append(
+  tp,
+  records.asJava
+))
+
+verify(replicaManag

[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213271843


##
core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala:
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, 
RecordTooLargeException}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, RecordConversionStats}
+import org.apache.kafka.common.utils.{MockTime, Time}
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, 
LogAppendInfo, LogConfig}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Test
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.charset.Charset
+import java.util.{Collections, Optional, OptionalInt, Properties}
+import scala.jdk.CollectionConverters._
+
+class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, 
String)] {
+  override def serializeKey(record: (String, String)): Array[Byte] = {
+record._1.getBytes(Charset.defaultCharset())
+  }
+
+  override def serializeValue(record: (String, String)): Array[Byte] = {
+record._2.getBytes(Charset.defaultCharset())
+  }
+}
+
+class PartitionWriterImplTest {
+  @Test
+  def testRegisterDeregisterListener(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val partitionRecordWriter = new PartitionWriterImpl(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  Time.SYSTEM
+)
+
+val listener = new PartitionWriter.Listener {
+  override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): 
Unit = {}
+}
+
+partitionRecordWriter.registerListener(tp, listener)
+verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener))
+
+partitionRecordWriter.deregisterListener(tp, listener)
+verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener))
+
+assertEquals(
+  new ListenerAdaptor(listener),
+  new ListenerAdaptor(listener)
+)
+assertEquals(
+  new ListenerAdaptor(listener).hashCode(),
+  new ListenerAdaptor(listener).hashCode()
+)
+  }
+
+  @Test
+  def testWriteRecords(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val time = new MockTime()
+val partitionRecordWriter = new PartitionWriterImpl(
+  replicaManager,
+  new StringKeyValueSerializer(),
+  CompressionType.NONE,
+  time
+)
+
+when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
+  Collections.emptyMap(),
+  new Properties()
+)))
+
+val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
+  ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+when(replicaManager.appendToLocalLog(
+  ArgumentMatchers.eq(true),
+  ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+  recordsCapture.capture(),
+  ArgumentMatchers.eq(1),
+  ArgumentMatchers.eq(RequestLocal.NoCaching)
+)).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo(
+  Optional.empty(),
+  10,
+  OptionalInt.empty(),
+  RecordBatch.NO_TIMESTAMP,
+  -1L,
+  RecordBatch.NO_TIMESTAMP,
+  -1L,
+  RecordConversionStats.EMPTY,
+  CompressionType.NONE,
+  CompressionType.NONE,
+  -1,
+  -1,
+  false,
+  -1L,
+  Collections.emptyList(),
+  "",
+  LeaderHwChange.INCREASED
+
+
+val records = List(
+  ("k0", "v0"),
+  ("k1", "v1"),
+  ("k2", "v2"),
+)
+
+assertEquals(11, partitionRecordWriter.append(
+  tp,
+  records.asJava
+))
+
+verify(replicaManag

[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213268934


##
core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.cluster.PartitionListener
+import kafka.server.{ReplicaManager, RequestLocal}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
TimestampType}
+import org.apache.kafka.common.record.Record.EMPTY_HEADERS
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter
+import org.apache.kafka.storage.internals.log.AppendOrigin
+
+import java.nio.ByteBuffer
+import java.util
+import scala.collection.Map
+
+private[group] class ListenerAdaptor(
+  val listener: PartitionWriter.Listener
+) extends PartitionListener {
+  override def onHighWatermarkUpdated(
+tp: TopicPartition,
+offset: Long
+  ): Unit = {
+listener.onHighWatermarkUpdated(tp, offset)
+  }
+
+  override def equals(that: Any): Boolean = that match {
+case other: ListenerAdaptor => listener.equals(other.listener)
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+listener.hashCode()
+  }
+
+  override def toString: String = {
+s"ListenerAdaptor(listener=$listener)"
+  }
+}
+
+class PartitionWriterImpl[T](
+  replicaManager: ReplicaManager,
+  serializer: PartitionWriter.Serializer[T],
+  compressionType: CompressionType,
+  time: Time
+) extends PartitionWriter[T] {
+
+  override def registerListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def deregisterListener(
+tp: TopicPartition,
+listener: PartitionWriter.Listener
+  ): Unit = {
+replicaManager.removeListener(tp, new ListenerAdaptor(listener))
+  }
+
+  override def append(
+tp: TopicPartition,
+records: util.List[T]
+  ): Long = {
+if (records.isEmpty) {
+  throw new IllegalStateException("records must be non-empty.")
+}
+
+replicaManager.getLogConfig(tp) match {
+  case Some(logConfig) =>
+val magic = logConfig.recordVersion.value
+val maxBatchSize = logConfig.maxMessageSize
+val currentTimeMs = time.milliseconds()
+
+val recordsBuilder = MemoryRecords.builder(
+  ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+  magic,
+  compressionType,
+  TimestampType.CREATE_TIME,
+  0L,
+  maxBatchSize
+)
+
+records.forEach { record =>
+  val keyBytes = serializer.serializeKey(record)
+  val valueBytes = serializer.serializeValue(record)
+
+  if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, 
EMPTY_HEADERS)) {
+recordsBuilder.append(
+  currentTimeMs,
+  keyBytes,
+  valueBytes,
+  EMPTY_HEADERS
+)
+  } else {
+throw new RecordTooLargeException(s"Message batch size is 
${recordsBuilder.estimatedSizeInBytes()} bytes " +
+  s"in append to partition $tp which exceeds the maximum 
configured size of $maxBatchSize.")
+  }
+}
+
+val appendResults = replicaManager.appendToLocalLog(

Review Comment:
   Yeah, I agree. The alternative would be to pass it via the method but this 
would mean hardcoding it somewhere else. Is it better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-01 Thread via GitHub


urbandan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1572188562

   @viktorsomogyi Could you please review this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13675: KAFKA-14462; [14/N] Add PartitionWriter

2023-06-01 Thread via GitHub


dajac commented on code in PR #13675:
URL: https://github.com/apache/kafka/pull/13675#discussion_r1213267178


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Append the messages to the local replica logs
+   * Append the messages to the local replica logs. 
ReplicaManager#appendRecords should usually be
+   * used instead of this method.

Review Comment:
   I did not use it here because it put stuff to the action queue and I don't 
want/need this in my case. Let me circle back on this to see if can do better. 
I wrote this a while ago and I admit that I went with the easiest solution back 
then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan opened a new pull request, #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-01 Thread via GitHub


urbandan opened a new pull request, #13796:
URL: https://github.com/apache/kafka/pull/13796

   …atches to complete before resetting the sequence number
   
   The idempotent producer resets the sequence number when a batch fails with a 
non-retriable error. This can violate the idempotent guarantee if there are 
preceding, in-flight requests which are being retried, as their producer epoch 
and sequence number gets rewritten, effectively granting those messages a new 
"identity".
   
   Instead, the producer should wait for the preceding, retried batches to 
complete before resetting the sequence number. This ensures that the sequence 
numbers can only get reset after the preceding batches are definitely completed.
   
   By calling markSequenceUnresolved instead of requestEpochBumpForPartition, 
draining batches from the partition is halted, and the epoch bump is delayed 
until all in-flight requests are completed.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jeffkbkim commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213254844


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


jsancio commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1213240584


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -33,6 +33,92 @@
 
 
 public class PartitionRegistration {
+
+/**
+ * A builder class which creates a PartitionRegistration.
+ */
+static public class Builder {
+private int[] replicas;
+private int[] isr;
+private int[] removingReplicas = Replicas.NONE;
+private int[] addingReplicas = Replicas.NONE;
+private Integer leader;
+private LeaderRecoveryState leaderRecoveryState;
+private Integer leaderEpoch;
+private Integer partitionEpoch;
+
+public Builder setReplicas(int[] replicas) {
+this.replicas = replicas;
+return this;
+}
+
+public Builder setIsr(int[] isr) {
+this.isr = isr;
+return this;
+}
+
+public Builder setRemovingReplicas(int[] removingReplicas) {
+this.removingReplicas = removingReplicas;
+return this;
+}
+
+public Builder setAddingReplicas(int[] addingReplicas) {
+this.addingReplicas = addingReplicas;
+return this;
+}
+
+public Builder setLeader(Integer leader) {
+this.leader = leader;
+return this;
+}
+
+public Builder setLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+this.leaderRecoveryState = leaderRecoveryState;
+return this;
+}
+
+public Builder setLeaderEpoch(Integer leaderEpoch) {
+this.leaderEpoch = leaderEpoch;
+return this;
+}
+
+public Builder setPartitionEpoch(Integer partitionEpoch) {
+this.partitionEpoch = partitionEpoch;
+return this;
+}
+
+public PartitionRegistration build() {
+if (replicas == null) {
+throw new IllegalStateException("You must set replicas.");
+} else if (isr == null) {
+throw new IllegalStateException("You must set isr.");
+} else if (removingReplicas == null) {
+throw new IllegalStateException("You must set removing 
replicas.");
+} else if (addingReplicas == null) {
+throw new IllegalStateException("You must set adding 
replicas.");
+} else if (leader == null) {
+throw new IllegalStateException("You must set leader.");
+} else if (leaderRecoveryState == null) {
+throw new IllegalStateException("You must set leader recovery 
state.");

Review Comment:
   I should have mentioned in my previous review but how about having a default 
of this state too? The default should be `RECOVERED`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13758: KAFKA-15010 ZK migration failover support

2023-06-01 Thread via GitHub


mumrah merged PR #13758:
URL: https://github.com/apache/kafka/pull/13758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


tombentley commented on code in PR #13792:
URL: https://github.com/apache/kafka/pull/13792#discussion_r1213226340


##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.

Review Comment:
   "the note below" is rather vague. Can use use an `` to link to it 
directly?



##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
+
+
+Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See potential performance impact
+following the upgrade for the details on what this 
configuration does.)
+
+If you are upgrading from version 0.11.0.x or above, and you have 
not overridden the message format, then you only need to override
+the inter-broker protocol version.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
3.4, 3.3, etc.)
+
+
+Upgrade the brokers one at a time: shut down the broker, update 
the code, and restart it. Once you have done so, the
+brokers will be running the latest version and you can verify that 
the cluster's behavior and performance meets expectations.
+It is still possible to downgrade at this point if there are any 
problems.
+
+Once the cluster's behavior and performance has been verified, 
bump the protocol version by editing
+inter.broker.protocol.version and setting it to 
3.5.
+
+Restart the brokers one by one for the new protocol version to 
take effect. Once the brokers begin using the latest
+protocol version, it will no longer be possible to downgrade the 
cluster to an older version.
+
+If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
+upgrade it to its latest version. Once all (or most) consumers 
have been upgraded to 0.11.0 or later,
+change log.message.format.version to 3.5 on each broker and 
restart them one by one. Note that the older Scala clients,
+which are no longer maintained, do not support the message format 
introduced in 0.11, so to avoid conversion costs
+(or to take advantage of exactly once semantics),
+the newer Java clients must be used.
+
+
+
+Upgrading 
KRaft-based clusters
+If you are upgrading from a version prior to 3.3.0, please see the 
note below. Once you have changed the metadata.version to the latest version, 
it will not be possible to downgrade to a version prior to 3.3-IV0.

Review Comment:
   "the note below" is rather vague. Can use use an `` to link to it 
directly?



##
docs/upgrade.html:
##
@@ -59,6 +59,65 @@ Notable changes in 3
 
 
 
+Upgrading 
ZooKeeper-based clusters
+If you are upgrading from a version prior to 2.1.x, please see the 
note below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
+
+
+Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.b

[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213232889


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   It should not but it could. Logging twice in this case is not an issue, I 
think. I would keep it as it is. beginShutdown is not necessarily called via 
close.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213222820


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The base event type used by all events processed in the
+ * coordinator runtime.
+ */
+public interface CoordinatorEvent extends EventAccumulator.Event {
+/**

Review Comment:
   I think that we do both in the code base but I can add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jeffkbkim commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213221009


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   can MultiThreadedEventProcessor#close() be called multiple times? we also 
have logging there as well. if beginShutdown is only called through close() 
then maybe we can synchronize there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jeffkbkim commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213191767


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The base event type used by all events processed in the
+ * coordinator runtime.
+ */
+public interface CoordinatorEvent extends EventAccumulator.Event {
+/**

Review Comment:
   nit: should we have a newline here?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 60)
+public class MultiThreadedEventProcessorTest {
+
+private static class FutureEvent implements CoordinatorEvent {
+private final int key;
+private final CompletableFuture future;
+private final Supplier supplier;
+
+FutureEvent(
+int key,
+Supplier supplier
+) {
+this.key = key;
+this.future = new CompletableFuture<>();
+this.supplier = supplier;
+}
+
+@Override
+public void run() {
+future.complete(supplier.get());
+}
+
+@Override
+public void complete(Throwable ex) {
+future.completeExceptionally(ex);
+}
+
+@Override
+public Integer key() {
+return key;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+@Override
+public String toString() {
+return "FutureEvent(key=" + key + ")";
+}
+}
+
+@Test
+public void testCreateAndClose() throws Exception {
+CoordinatorEventProcessor eventProcessor = new 
MultiThreadedEventProcessor(
+new LogContext(),
+"event-processor-",
+2
+);
+eventProcessor.close();
+}
+
+@Test
+public void testEventsAreProcessed() throws Exception {

Review Comment:
   thanks, looks good!



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for 

[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213213734


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   Removed the lock. I have put `synchronized` to `beginShutdown` instead. This 
is needed because `shuttingDown` is accessed twice for the logging message as 
you pointed out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13769: MINOR: Covering all epoch cases in add partitions to txn manager

2023-06-01 Thread via GitHub


dajac commented on code in PR #13769:
URL: https://github.com/apache/kafka/pull/13769#discussion_r1213186879


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransac
   val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
 
 
-class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time)
   extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
-  
+
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
-  
+
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exception.
-  if (currentTransactionData != null) {
-val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-  Errors.INVALID_PRODUCER_EPOCH
-else 
-  Errors.NETWORK_EXCEPTION
-val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-currentTransactionData.topics().forEach { topic =>
-  topic.partitions().forEach { partition =>
-topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-  }
+  val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // There are 3 cases if we already have existing data
+  // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+  // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception
+  // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+  if (existingTransactionData != null) {
+if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {

Review Comment:
   nit: `()` can be omitted. There are a few other case below.



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -36,45 +36,58 @@ class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransac
   val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
 
 
-class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time)
   extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
-  
+
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
-  
+
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
 nodesToTransactions.synchronized {
   // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+  val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
 new TransactionDataAndCallbacks(
   new AddPartitionsToTxnTransactionCollection(1),
   mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
-
-  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-  // reconnected so return the retriable network exceptio

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


jeffkbkim commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213190623


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   is it more on logging multiple times? i don't see any harm in calling 
accumulator.close multiple times. also on MultiThreadedEventProcessor#close()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213165026


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();

Review Comment:
   We need a lock in shutdown because we access multiple variables and we don't 
want to trigger the shutdown multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-06-01 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1213163941


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A {{@link CoordinatorEvent}} processor.
+ */
+public interface CoordinatorEventProcessor extends AutoCloseable {

Review Comment:
   Yep. I use a different implementation in unit tests (without threads) and I 
plan to use another one for simulation tests at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15047) Handle rolling segments when the active segment's retention is breached incase of tiered storage is enabled.

2023-06-01 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15047:
---
Description: 
Active segments are not copied by remote storage subsystem. But they can be 
eligible for retention cleanup. 

So, we need to roll the active segment incase remote storage is enabled so that 
this can be copied by the remote storage subsystem and eventually picked up for 
retention cleanup. 

> Handle rolling segments when the active segment's retention is breached 
> incase of tiered storage is enabled.
> 
>
> Key: KAFKA-15047
> URL: https://issues.apache.org/jira/browse/KAFKA-15047
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Active segments are not copied by remote storage subsystem. But they can be 
> eligible for retention cleanup. 
> So, we need to roll the active segment incase remote storage is enabled so 
> that this can be copied by the remote storage subsystem and eventually picked 
> up for retention cleanup. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-01 Thread via GitHub


novosibman commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1572027303

   > Are all the graphs shared for OMB and Kafka Tussle generated for Kafka 
with the fix in this PR?
   Graphs with the fix noted in first description comment  - marked with 
`kafka_2.13-3.6.0-snapshot-fix` label.
   
   Other graphs in latter comment are examples of how rolling affects results 
on different configurations and benchmarks using regular Kafka release.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-01 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1572025066

   > No, you are correct, I had another look at the neighbouring code after I 
wrote my previous comment and I agree that this appears to not cause any 
leadership movements. Okay, I am happy to give my approval. Can you rebase on 
the latest trunk so test can be reran?
   
   Hi, I have merged latest trunk into the PR branch and re-ran unit test. It 
seems that the failure has nothing to do with me. @clolov 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-01 Thread via GitHub


dajac opened a new pull request, #13795:
URL: https://github.com/apache/kafka/pull/13795

   This PR includes temporary https://github.com/apache/kafka/pull/13666, 
https://github.com/apache/kafka/pull/13675, 
https://github.com/apache/kafka/pull/13793 and 
https://github.com/apache/kafka/pull/13794.
   
   This patch introduces the CoordinatorRuntime. The CoordinatorRuntime is a 
framework which encapsulate all the common features requires to build a 
coordinator such as the group coordinator. Please refer to the javadoc of that 
class for the details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213113458


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1383,20 +1421,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }
 
+  private[log] def localRetentionMs: Long = {
+if (config.remoteLogConfig.remoteStorageEnable) 
config.remoteLogConfig.localRetentionMs else config.retentionMs
+  }
+
   private def deleteRetentionMsBreachedSegments(): Int = {
-if (config.retentionMs < 0) return 0
+val retentionMs = localRetentionMs

Review Comment:
   This is addressed with the latest commits. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213112946


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -154,16 +154,41 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
 initializePartitionMetadata()
 updateLogStartOffset(logStartOffset)
+updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+if (!remoteLogEnabled())
+  logStartOffset = localLogStartOffset()

Review Comment:
   We already set the localLogStartOffset as max of passed logStartOffset and 
the first segment's base offset. 
   When remote log is not enabled, `logStartOffset` is set as 
`localLogStartOffset` as computed above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13794: KAFKA-14462; [16/N] Add CoordinatorLoader and Replayable interfaces

2023-06-01 Thread via GitHub


dajac opened a new pull request, #13794:
URL: https://github.com/apache/kafka/pull/13794

   As the title says.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13793: KAFKA-14462; [15/N] Make Result generic and rename it

2023-06-01 Thread via GitHub


dajac opened a new pull request, #13793:
URL: https://github.com/apache/kafka/pull/13793

   As the title says.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107956


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the

Review Comment:
   Updated the comment to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213107255


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213102668


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] satishd commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-01 Thread via GitHub


satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1213100593


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+}
+
+// No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+return isSegmentDeleted;
+}
+
+private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate predicate)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (predicate.test(segmentMetadata)) {
+// Publish delete segment started event.
+remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+// Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+// Publish delete segment finished event.
+remoteLogMe

[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-06-01 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1571989946

   Hello David (@dajac), I was discussing this with Christo today as part of 
his work on the OffsetFetch API. Would you like this PR on OffsetCommit to be 
split to make the review easier and reduce risks?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15047) Handle rolling segments when the active segment's retention is breached incase of tiered storage is enabled.

2023-06-01 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-15047:
--

 Summary: Handle rolling segments when the active segment's 
retention is breached incase of tiered storage is enabled.
 Key: KAFKA-15047
 URL: https://issues.apache.org/jira/browse/KAFKA-15047
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Kamal Chandraprakash






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-06-01 Thread via GitHub


gr-m-9 commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1212990392


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -447,6 +482,19 @@ public void close() {
 
 @Override
 public void close(Duration timeout) {
+if (timeout.toMillis() < 0)
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+try {
+if (!closed) {
+close(timeout, false);
+}
+} finally {
+closed = true;

Review Comment:
   could you explain why closed is boolean but shouldWakeup is AtomicBoolean ?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer valueDeserializer) {
 this.time = Time.SYSTEM;
 GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
+GroupRebalanceConfig.ProtocolType.CONSUMER);
 this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
 this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
 // If group.instance.id is set, we will append it to the log context.
 if (groupRebalanceConfig.groupInstanceId.isPresent()) {
 logContext = new LogContext("[Consumer instanceId=" + 
groupRebalanceConfig.groupInstanceId.get() +
-", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
+", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");

Review Comment:
   creation such of instances shall be done in separate methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


andymg3 commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1212980561


##
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##
@@ -77,39 +77,75 @@ public void testChangeRecordIsNoOp() {
 );
 }
 
-private final static PartitionRegistration FOO = new PartitionRegistration(
-new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
-1, LeaderRecoveryState.RECOVERED, 100, 200);
+private static final PartitionRegistration FOO;
+
+static {
+try {
+FOO = new PartitionRegistration.Builder().
+setReplicas(new int[] {2, 1, 3}).
+setIsr(new int[] {2, 1, 3}).
+setRemovingReplicas(Replicas.NONE).
+setAddingReplicas(Replicas.NONE).

Review Comment:
   Done here and all the other places.



##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -33,6 +33,92 @@
 
 
 public class PartitionRegistration {
+
+/**
+ * A builder class which creates a PartitionRegistration.
+ */
+static public class Builder {
+private int[] replicas;
+private int[] isr;
+private int[] removingReplicas;
+private int[] addingReplicas;
+private Integer leader;
+private LeaderRecoveryState leaderRecoveryState;
+private Integer leaderEpoch;
+private Integer partitionEpoch;
+
+public Builder setReplicas(int[] replicas) {
+this.replicas = replicas;
+return this;
+}
+
+public Builder setIsr(int[] isr) {
+this.isr = isr;
+return this;
+}
+
+public Builder setRemovingReplicas(int[] removingReplicas) {
+this.removingReplicas = removingReplicas;
+return this;
+}
+
+public Builder setAddingReplicas(int[] addingReplicas) {
+this.addingReplicas = addingReplicas;
+return this;
+}
+
+public Builder setLeader(Integer leader) {
+this.leader = leader;
+return this;
+}
+
+public Builder setLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+this.leaderRecoveryState = leaderRecoveryState;
+return this;
+}
+
+public Builder setLeaderEpoch(Integer leaderEpoch) {
+this.leaderEpoch = leaderEpoch;
+return this;
+}
+
+public Builder setPartitionEpoch(Integer partitionEpoch) {
+this.partitionEpoch = partitionEpoch;
+return this;
+}
+
+public PartitionRegistration build() throws Exception {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


andymg3 commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1212980304


##
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##
@@ -77,39 +77,75 @@ public void testChangeRecordIsNoOp() {
 );
 }
 
-private final static PartitionRegistration FOO = new PartitionRegistration(
-new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
-1, LeaderRecoveryState.RECOVERED, 100, 200);
+private static final PartitionRegistration FOO;
+
+static {
+try {
+FOO = new PartitionRegistration.Builder().
+setReplicas(new int[] {2, 1, 3}).
+setIsr(new int[] {2, 1, 3}).
+setRemovingReplicas(Replicas.NONE).
+setAddingReplicas(Replicas.NONE).
+setLeader(1).
+setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
+setLeaderEpoch(100).
+setPartitionEpoch(200).
+build();
+} catch (Exception e) {
+throw new RuntimeException(e);
+}

Review Comment:
   Yeah. I've changed the signature to not have `throws Exception` so I've 
removed the catches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] andymg3 commented on a diff in pull request #13788: KAFKA-14791: Create a builder for PartitionRegistration

2023-06-01 Thread via GitHub


andymg3 commented on code in PR #13788:
URL: https://github.com/apache/kafka/pull/13788#discussion_r1212978727


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -33,6 +33,92 @@
 
 
 public class PartitionRegistration {
+
+/**
+ * A builder class which creates a PartitionRegistration.
+ */
+static public class Builder {
+private int[] replicas;
+private int[] isr;
+private int[] removingReplicas;
+private int[] addingReplicas;
+private Integer leader;
+private LeaderRecoveryState leaderRecoveryState;
+private Integer leaderEpoch;
+private Integer partitionEpoch;
+
+public Builder setReplicas(int[] replicas) {
+this.replicas = replicas;
+return this;
+}
+
+public Builder setIsr(int[] isr) {
+this.isr = isr;
+return this;
+}
+
+public Builder setRemovingReplicas(int[] removingReplicas) {
+this.removingReplicas = removingReplicas;
+return this;
+}
+
+public Builder setAddingReplicas(int[] addingReplicas) {
+this.addingReplicas = addingReplicas;
+return this;
+}
+
+public Builder setLeader(Integer leader) {
+this.leader = leader;
+return this;
+}
+
+public Builder setLeaderRecoveryState(LeaderRecoveryState 
leaderRecoveryState) {
+this.leaderRecoveryState = leaderRecoveryState;
+return this;
+}
+
+public Builder setLeaderEpoch(Integer leaderEpoch) {
+this.leaderEpoch = leaderEpoch;
+return this;
+}
+
+public Builder setPartitionEpoch(Integer partitionEpoch) {
+this.partitionEpoch = partitionEpoch;
+return this;
+}
+
+public PartitionRegistration build() throws Exception {
+if (replicas == null) {
+throw new IllegalStateException("You must set replicas.");
+} else if (isr == null) {
+throw new IllegalStateException("You must set isr.");
+} else if (removingReplicas == null) {
+throw new IllegalStateException("You must set removing 
replicas.");
+} else if (addingReplicas == null) {
+throw new IllegalStateException("You must set adding 
replicas.");

Review Comment:
   Done. Used `Replicas.NONE`.



##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -57,7 +143,7 @@ public PartitionRegistration(PartitionRecord record) {
 record.partitionEpoch());
 }
 
-public PartitionRegistration(int[] replicas, int[] isr, int[] 
removingReplicas,
+protected PartitionRegistration(int[] replicas, int[] isr, int[] 
removingReplicas,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison commented on PR #13792:
URL: https://github.com/apache/kafka/pull/13792#issuecomment-1571810081

   @tombentley @dajac Can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-06-01 Thread via GitHub


gr-m-9 commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1212959825


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer valueDeserializer) {
 this.time = Time.SYSTEM;
 GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
+GroupRebalanceConfig.ProtocolType.CONSUMER);
 this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
 this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
 // If group.instance.id is set, we will append it to the log context.
 if (groupRebalanceConfig.groupInstanceId.isPresent()) {
 logContext = new LogContext("[Consumer instanceId=" + 
groupRebalanceConfig.groupInstanceId.get() +
-", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
+", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
 } else {
 logContext = new LogContext("[Consumer clientId=" + clientId + ", 
groupId=" + groupId.orElse("null") + "] ");
 }

Review Comment:
   a bit inconsistent: this.groupId and later groupId.orElse, when there's no 
need to use this then it should't be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13792: MINOR: Add 3.5 upgrade steps for ZK and KRaft

2023-06-01 Thread via GitHub


mimaison opened a new pull request, #13792:
URL: https://github.com/apache/kafka/pull/13792

   Moved the ZK and KRaft steps into `h5` sections so they are below the `h4` 
Upgrading to 3.5.0 from any version 0.8.x through 3.4.x section.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gr-m-9 commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-06-01 Thread via GitHub


gr-m-9 commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1212959825


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -114,14 +119,14 @@ public PrototypeAsyncConsumer(final ConsumerConfig config,
   final Deserializer valueDeserializer) {
 this.time = Time.SYSTEM;
 GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-GroupRebalanceConfig.ProtocolType.CONSUMER);
+GroupRebalanceConfig.ProtocolType.CONSUMER);
 this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
 this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
 // If group.instance.id is set, we will append it to the log context.
 if (groupRebalanceConfig.groupInstanceId.isPresent()) {
 logContext = new LogContext("[Consumer instanceId=" + 
groupRebalanceConfig.groupInstanceId.get() +
-", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
+", clientId=" + clientId + ", groupId=" + 
groupId.orElse("null") + "] ");
 } else {
 logContext = new LogContext("[Consumer clientId=" + clientId + ", 
groupId=" + groupId.orElse("null") + "] ");
 }

Review Comment:
   a bit inconsistent: this.groupId and later groupId.orElse, when there's no 
need to this then it should't be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >