[jira] [Resolved] (KAFKA-16035) add integration test for ExpiresPerSec and RemoteLogSizeComputationTime metrics

2023-12-21 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16035.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> add integration test for ExpiresPerSec and RemoteLogSizeComputationTime 
> metrics
> ---
>
> Key: KAFKA-16035
> URL: https://issues.apache.org/jira/browse/KAFKA-16035
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.7.0
>
>
> add integration test for ExpiresPerSec and RemoteLogSizeComputationTime 
> metrics
> https://github.com/apache/kafka/pull/15015/commits/517a7c19d5a19bc94f0f79c02a239fd1ff7f6991



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16035: add tests for remoteLogSizeComputationTime/remoteFetchExpiresPerSec metrics [kafka]

2023-12-21 Thread via GitHub


showuon merged PR #15056:
URL: https://github.com/apache/kafka/pull/15056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16014: Implement RemoteLogSizeBytes [kafka]

2023-12-21 Thread via GitHub


showuon merged PR #15050:
URL: https://github.com/apache/kafka/pull/15050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16014: Implement RemoteLogSizeBytes [kafka]

2023-12-21 Thread via GitHub


showuon commented on PR #15050:
URL: https://github.com/apache/kafka/pull/15050#issuecomment-1867309312

   Had run twice CI and failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16046:

Labels: streams  (was: )

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16046:

Component/s: streams

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-16046:
---

 Summary: Stream Stream Joins fail after restoration with 
deserialization exceptions
 Key: KAFKA-16046
 URL: https://issues.apache.org/jira/browse/KAFKA-16046
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Almog Gavra
Assignee: Almog Gavra


Before KIP-954, the `KStreamImplJoin` class would always create non-timestamped 
persistent windowed stores. After that KIP, the default was changed to create 
timestamped stores. This wasn't compatible because, during restoration, 
timestamped stores have their changelog values transformed to prepend the 
timestamp to the value. This caused serialization errors when trying to read 
from the store because the deserializers did not expect the timestamp to be 
prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799623#comment-17799623
 ] 

Almog Gavra commented on KAFKA-16046:
-

https://github.com/apache/kafka/pull/15061

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] fix stream-stream-join store types [kafka]

2023-12-21 Thread via GitHub


agavra opened a new pull request, #15061:
URL: https://github.com/apache/kafka/pull/15061

   Before #14648, the `KStreamImplJoin` class would always create 
non-timestamped persistent windowed stores. After that PR, the default was 
changed to create timestamped stores. This wasn't compatible because, during 
restoration, timestamped stores have their values transformed to prepend the 
timestamp to the value. This caused serialization errors when trying to read 
from the store because the deserializers did not expect the timestamp to be 
prepended.
   
   To fix this, we allow creating non-timestamped stores using the 
`DslWindowParams`
   
   Testing was done both manually as well as adding a unit test to ensure that 
the stores created are not timestamped.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


philipnee commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434559814


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -96,13 +96,13 @@
 import static org.apache.kafka.common.utils.Utils.swallow;
 
 /**
- * A client that consumes records from a Kafka cluster using the {@link 
GroupProtocol#GENERIC generic group protocol}.
+ * A client that consumes records from a Kafka cluster using the {@link 
GroupProtocol#CLASSIC classic group protocol}.
  * In this implementation, all network I/O happens in the thread of the 
application making the call.
  *
  * 
  *
  * Note: per its name, this implementation is left for backward 
compatibility purposes. The updated consumer
- * group protocol (from KIP-848) introduces allows users continue using the 
legacy "generic" group protocol.
+ * group protocol (from KIP-848) introduces allows users continue using the 
legacy "classic" group protocol.

Review Comment:
   there's a bit of grammatical confusion here, perhaps: `group protocol ... 
allows user continue...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15035:
URL: https://github.com/apache/kafka/pull/15035#issuecomment-1867003094

   @dajac - thanks for merging the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16040) Rename `Generic` to `Classic`

2023-12-21 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16040.
-
Resolution: Fixed

> Rename `Generic` to `Classic`
> -
>
> Key: KAFKA-16040
> URL: https://issues.apache.org/jira/browse/KAFKA-16040
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.7.0
>
>
> People has raised concerned about using {{Generic}} as a name to designate 
> the old rebalance protocol. We considered using {{Legacy}} but discarded it 
> because there are still applications, such as Connect, using the old 
> protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance 
> Protocol{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac merged PR #15059:
URL: https://github.com/apache/kafka/pull/15059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #15059:
URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866963946

   As it is really hard to get a clean build due to OOM errors (also in trunk), 
I have verified the build locally. I am going to merge it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16045) ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky

2023-12-21 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16045:
---
Labels: flaky-test  (was: )

> ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky
> -
>
> Key: KAFKA-16045
> URL: https://issues.apache.org/jira/browse/KAFKA-16045
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I'm seeing ZkMigrationIntegrationTest.testMigrateTopicDeletion fail for many 
> builds. I believe it is also causing a thread leak because on most runs where 
> it fails I also see ReplicaManager tests also fail with extra threads. 
> The test always fails 
> `org.opentest4j.AssertionFailedError: Timed out waiting for topics to be 
> deleted`
> gradle enterprise link:
> [https://ge.apache.org/scans/tests?search.names=Git%20branch[…]lues=trunk=kafka.zk.ZkMigrationIntegrationTest|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.zk.ZkMigrationIntegrationTest]
> recent pr: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15023/18/tests/]
> trunk builds: 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2502/tests],
>  
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2501/tests]
>  (edited) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16045) ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky

2023-12-21 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16045:
--

 Summary: ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky
 Key: KAFKA-16045
 URL: https://issues.apache.org/jira/browse/KAFKA-16045
 Project: Kafka
  Issue Type: Test
Reporter: Justine Olshan


I'm seeing ZkMigrationIntegrationTest.testMigrateTopicDeletion fail for many 
builds. I believe it is also causing a thread leak because on most runs where 
it fails I also see ReplicaManager tests also fail with extra threads. 

The test always fails 
`org.opentest4j.AssertionFailedError: Timed out waiting for topics to be 
deleted`


gradle enterprise link:

[https://ge.apache.org/scans/tests?search.names=Git%20branch[…]lues=trunk=kafka.zk.ZkMigrationIntegrationTest|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.zk.ZkMigrationIntegrationTest]

recent pr: 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15023/18/tests/]
trunk builds: 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2502/tests],
 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2501/tests]
 (edited) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


jolshan commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866936613

   jira here: [KAFKA-16045](https://issues.apache.org/jira/browse/KAFKA-16045)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


jolshan commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866933588

   Yeah. I was looking at builds with @dajac for his PR too. The java 8 one 
here OOM'd. 
   I think the ZkMigrationTests are causing the thread leak for the replica 
manager since I see replica manager tests failing with this issue on the same 
builds as the ZKMigrationTest failures and not a lot of commonalities otherwise.
   
   I will file a JIRA for that. I will start one more time to get a super clear 
signal and will come back afterwards.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866886304

   jenkins doesn't seem to be in a great state.  there are a bunch of 
zkMigrationIntegrationTest and ReplicaManagerTest complaining timeout and 
`Found unexpected 1 NonDaemon threads` (there are others as well)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2023-12-21 Thread Alexander Aghili (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Aghili updated KAFKA-16027:
-
External issue URL: https://github.com/apache/kafka/pull/15055

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Assignee: Alexander Aghili
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2023-12-21 Thread Alexander Aghili (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Aghili updated KAFKA-16027:
-
External issue URL:   (was: https://github.com/apache/kafka/pull/15055)

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Assignee: Alexander Aghili
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership

2023-12-21 Thread Alexander Aghili (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Aghili reassigned KAFKA-16027:


Assignee: Alexander Aghili

> Refactor MetadataTest#testUpdatePartitionLeadership
> ---
>
> Key: KAFKA-16027
> URL: https://issues.apache.org/jira/browse/KAFKA-16027
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Philip Nee
>Assignee: Alexander Aghili
>Priority: Minor
>  Labels: newbie
>
> MetadataTest#testUpdatePartitionLeadership is extremely long.  I think it is 
> pretty close to the 160 line method limit - I tried to modfity it but it 
> would hit the limit when i tried to break things into separated lines.
> The test also contains two tests, so it is best to split it into two separate 
> tests.
> We should also move this to ConsumerMetadata.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866810047

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]

2023-12-21 Thread via GitHub


srdo commented on PR #15024:
URL: https://github.com/apache/kafka/pull/15024#issuecomment-1866797498

   Thanks. Added the suggested test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac merged PR #14985:
URL: https://github.com/apache/kafka/pull/14985


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #14985:
URL: https://github.com/apache/kafka/pull/14985#issuecomment-1866794392

   We've got a decent build 
[here](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14985/9/tests/).
 All the failed tests are unrelated. I am going to merge it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]

2023-12-21 Thread via GitHub


CharlyRien commented on PR #14157:
URL: https://github.com/apache/kafka/pull/14157#issuecomment-1866758963

   Hello. 
   Any news on this issue? @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799532#comment-17799532
 ] 

Arpit Goyal commented on KAFKA-15388:
-

[~divijvaidya] [~satish.duggana] [~showuon] Pr for review 
https://github.com/apache/kafka/pull/15060

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]

2023-12-21 Thread via GitHub


jolshan commented on PR #14599:
URL: https://github.com/apache/kafka/pull/14599#issuecomment-1866730662

   Sorry for the delay @MikeEdgar. That is a known issue for the test. I will 
also look at this PR and run the build again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


jolshan commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866728811

   Thanks for checking @philipnee. I will wait for this run to complete and we 
should be good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]

2023-12-21 Thread via GitHub


iit2009060 opened a new pull request, #15060:
URL: https://github.com/apache/kafka/pull/15060

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use --no-daemon when building with Jenkins [kafka]

2023-12-21 Thread via GitHub


ijuma commented on code in PR #15057:
URL: https://github.com/apache/kafka/pull/15057#discussion_r1434339183


##
Jenkinsfile:
##
@@ -21,7 +21,7 @@ def doValidation() {
   // Run all the tasks associated with `check` except for `test` - the latter 
is executed via `doTest`
   sh """
 ./retry_zinc ./gradlew -PscalaVersion=$SCALA_VERSION clean check -x test \
---profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode="session"
+--no-daemon --profile --continue -PxmlSpotBugsReport=true 
-PkeepAliveMode="session"

Review Comment:
   We can probably remove `keepAliveMode` if we use `no-daemon`:
   
   > keepAliveMode: configures the keep alive mode for the Gradle compilation 
daemon - reuse improves start-up time. The values should be one of daemon or 
session (the default is daemon). daemon keeps the daemon alive until it's 
explicitly stopped while session keeps it alive until the end of the build 
session. This currently only affects the Scala compiler, see 
https://github.com/gradle/gradle/pull/21034 for a PR that attempts to do the 
same for the Java compiler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866678547

   Consumer Integration tests also seems fine.
   
![image](https://github.com/apache/kafka/assets/1930388/df5307c2-0735-4789-9b87-e6d5eb9f1e7d)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10897) kafka quota optimization

2023-12-21 Thread Afshin Moazami (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799504#comment-17799504
 ] 

Afshin Moazami commented on KAFKA-10897:


For topic-partition quota configuration, I proposed this KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota]

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Assignee: Kahn Cheny
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16041) Replace Afterburn module with Blackbird

2023-12-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799498#comment-17799498
 ] 

Mickael Maison commented on KAFKA-16041:


This is follow up work from https://issues.apache.org/jira/browse/KAFKA-15996

> Replace Afterburn module with Blackbird
> ---
>
> Key: KAFKA-16041
> URL: https://issues.apache.org/jira/browse/KAFKA-16041
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Mario Fiore Vitale
>Priority: Major
> Fix For: 4.0.0
>
>
> [Blackbird|https://github.com/FasterXML/jackson-modules-base/blob/master/blackbird/README.md]
>  is the Afterburn replacement for Java 11+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866627736

   
![image](https://github.com/apache/kafka/assets/1930388/6d79e6b4-76dc-49cc-8cbb-9b237a5dc7ef)
   
   Unit tests seems fine, there is 1 known flaky test under AbstractCoordinator 
and 1 related to AsyncKafkaConsumer.  Both are unrelated to the metadata change 
here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16044) Throttling using Topic Partition Quota

2023-12-21 Thread Afshin Moazami (Jira)
Afshin Moazami created KAFKA-16044:
--

 Summary: Throttling using Topic Partition Quota 
 Key: KAFKA-16044
 URL: https://issues.apache.org/jira/browse/KAFKA-16044
 Project: Kafka
  Issue Type: New Feature
Reporter: Afshin Moazami


With 
!https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21141=issuetype!
 KAFKA-16042 introducing the topic-partition byte rate and metrics, and 
!https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21141=issuetype!
 KAFKA-16043 introducing the quota limit configuration in the topic-level, we 
can enforce quota on topic-partition level for configured topics. 

More details in the 
[KIP-1010|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16043) Add Quota configuration for topics

2023-12-21 Thread Afshin Moazami (Jira)
Afshin Moazami created KAFKA-16043:
--

 Summary: Add Quota configuration for topics
 Key: KAFKA-16043
 URL: https://issues.apache.org/jira/browse/KAFKA-16043
 Project: Kafka
  Issue Type: New Feature
Reporter: Afshin Moazami


To be able to have topic-partition quota, we need to introduce two topic 
configuration for the producer-byte-rate and consumer-byte-rate. 

The assumption is that all partitions of the same topic get the same quota, so 
we define one config per topic. 

This configuration should work both with zookeeper and kraft setup. 

Also, we should define a default quota value (to be discussed) and potentially 
use the same format as user/client default configuration using `` as 
the value. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16042) Quota Metrics based on per topic-partition produce/fetch byte rate

2023-12-21 Thread Afshin Moazami (Jira)
Afshin Moazami created KAFKA-16042:
--

 Summary: Quota Metrics based on per topic-partition produce/fetch 
byte rate
 Key: KAFKA-16042
 URL: https://issues.apache.org/jira/browse/KAFKA-16042
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: Afshin Moazami
Assignee: Afshin Moazami


Currently, Kafka emits the producer-byte-rate and fetch-bytes-rate for quota 
calculations. By adding a new signature to the 
`[quotaMetricTags|https://github.com/afshing/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java#L40]`
 method to add the individual topic-partitions size as a parameter, we can 
define metrics based on the topic name and partition id. 

To do that, we need both `ProduceRequest` and `FetchResponse` have the 
`partitionSizes` method and it is public. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


philipnee commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866607465

   Hey @jolshan - Thanks.  OOM also happens to another PR of mine see #15035 - 
i thought it was introduced by the AsyncKafkaConsumerTest but apparently 
there's more to it... Let me verify the tests locally and I'll the results 
there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]

2023-12-21 Thread via GitHub


jolshan commented on PR #15023:
URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866595042

   I'm seeing some OOMs and such again. I will run one more time. @philipnee if 
you don't mind just double checking locally for the tests on your end. I'm not 
seeing any failing consistently, but just want to be extra careful near the 
code freeze. I will check back in few hours or so.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]

2023-12-21 Thread via GitHub


mimaison commented on code in PR #15048:
URL: https://github.com/apache/kafka/pull/15048#discussion_r1434227907


##
docker/jvm/launch:
##
@@ -28,25 +27,25 @@ fi
 # the default is to pick the first IP (or network).
 export KAFKA_JMX_HOSTNAME=${KAFKA_JMX_HOSTNAME:-$(hostname -i | cut -d" " -f1)}
 
-if [ "$KAFKA_JMX_PORT" ]; then
+if [ "${KAFKA_JMX_PORT-}" ]; then
   # This ensures that the "if" section for JMX_PORT in kafka launch script 
does not trigger.
 export JMX_PORT=$KAFKA_JMX_PORT
-export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS 
-Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME 
-Dcom.sun.management.jmxremote.local.only=false 
-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT 
-Dcom.sun.management.jmxremote.port=$JMX_PORT"
+export KAFKA_JMX_OPTS="${KAFKA_JMX_OPTS-} 
-Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME 
-Dcom.sun.management.jmxremote.local.only=false 
-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT 
-Dcom.sun.management.jmxremote.port=$JMX_PORT"
 fi
 
 # Make a temp env variable to store user provided performance otps
-if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
+if [ -z "${KAFKA_JVM_PERFORMANCE_OPTS-}" ]; then
 export TEMP_KAFKA_JVM_PERFORMANCE_OPTS=""
 else
 export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS"
 fi
 
 # We will first use CDS for storage to format storage
-export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS 
-XX:SharedArchiveFile=/opt/kafka/storage.jsa"
+export KAFKA_JVM_PERFORMANCE_OPTS="${KAFKA_JVM_PERFORMANCE_OPTS-} 
-XX:SharedArchiveFile=/opt/kafka/storage.jsa"
 
 echo "===> Using provided cluster id $CLUSTER_ID ..."
 # A bit of a hack to not error out if the storage is already formatted. Need 
storage-tool to support this

Review Comment:
   Could we handle that in the wrapper?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434244004


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434242991


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +241,31 @@ public long append(
 }
 }
 
+@Override
+public long appendEndTransactionMarker(
+TopicPartition tp,
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) throws KafkaException {
+PartitionState state = partitionState(tp);
+state.lock.lock();
+try {
+state.entries.add(new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+));
+state.endOffset += 1;
+if (autoCommit) commit(tp, state.endOffset);

Review Comment:
   synced offline -- this is like acks=all for one replica and helpful in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]

2023-12-21 Thread via GitHub


nizhikov commented on code in PR #14867:
URL: https://github.com/apache/kafka/pull/14867#discussion_r1434240386


##
core/src/main/scala/kafka/zk/ZkData.scala:
##
@@ -1103,11 +1105,11 @@ object ZkData {
 IsrChangeNotificationZNode.path,
 ProducerIdBlockZNode.path,
 LogDirEventNotificationZNode.path
-  ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
+  ) ++ 
JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.map(ConfigEntityTypeZNode.path)

Review Comment:
   Great catch. Thanks. Rewrote like you suggested.



##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -77,8 +78,9 @@ object ConfigCommand extends Logging {
 
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
-  val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+ 
ConfigType.ClientMetrics
-  val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
+  @nowarn("cat=deprecation")
+  val BrokerSupportedConfigTypes = 
JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.toSeq
 :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS

Review Comment:
   Great catch. Thanks. Rewrote like you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]

2023-12-21 Thread via GitHub


mfvitale commented on PR #14992:
URL: https://github.com/apache/kafka/pull/14992#issuecomment-1866538380

   > Thank you for this PR. I have left some minor comments.
   > 
   > Also, please add a JIRA targeting 4.0 (where we will deprecate JDK 8) to 
replace afterburn with blackbird.
   
   @divijvaidya https://issues.apache.org/jira/browse/KAFKA-16041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16041) Replace Afterburn module with Blackbird

2023-12-21 Thread Mario Fiore Vitale (Jira)
Mario Fiore Vitale created KAFKA-16041:
--

 Summary: Replace Afterburn module with Blackbird
 Key: KAFKA-16041
 URL: https://issues.apache.org/jira/browse/KAFKA-16041
 Project: Kafka
  Issue Type: Task
  Components: connect
Reporter: Mario Fiore Vitale
 Fix For: 4.0.0


[Blackbird|https://github.com/FasterXML/jackson-modules-base/blob/master/blackbird/README.md]
 is the Afterburn replacement for Java 11+



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]

2023-12-21 Thread via GitHub


mfvitale commented on code in PR #14992:
URL: https://github.com/apache/kafka/pull/14992#discussion_r1434225385


##
build.gradle:
##
@@ -940,6 +940,7 @@ project(':core') {
 implementation libs.jacksonModuleScala
 implementation libs.jacksonDataformatCsv
 implementation libs.jacksonJDK8Datatypes
+implementation libs.jacksonAfterburner

Review Comment:
   I have been misled by other Jackson dependencies. Removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434220316


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +241,31 @@ public long append(
 }
 }
 
+@Override
+public long appendEndTransactionMarker(
+TopicPartition tp,
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) throws KafkaException {
+PartitionState state = partitionState(tp);
+state.lock.lock();
+try {
+state.entries.add(new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+));
+state.endOffset += 1;
+if (autoCommit) commit(tp, state.endOffset);

Review Comment:
   No. It is more like having a partition with a single replica so the HWM 
advances directly. Keep in mind that this is designed for being used in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434218393


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434215098


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +241,31 @@ public long append(
 }
 }
 
+@Override
+public long appendEndTransactionMarker(
+TopicPartition tp,
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) throws KafkaException {
+PartitionState state = partitionState(tp);
+state.lock.lock();
+try {
+state.entries.add(new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+));
+state.endOffset += 1;
+if (autoCommit) commit(tp, state.endOffset);

Review Comment:
   Is this like acks = 0? I wasn't aware of when this was used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1434214195


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]

2023-12-21 Thread via GitHub


mimaison commented on code in PR #14867:
URL: https://github.com/apache/kafka/pull/14867#discussion_r1434193675


##
core/src/main/scala/kafka/zk/ZkData.scala:
##
@@ -1103,11 +1105,11 @@ object ZkData {
 IsrChangeNotificationZNode.path,
 ProducerIdBlockZNode.path,
 LogDirEventNotificationZNode.path
-  ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
+  ) ++ 
JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.map(ConfigEntityTypeZNode.path)

Review Comment:
   Same as above, can we build this collection without using `JavaConverters`?



##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -77,8 +78,9 @@ object ConfigCommand extends Logging {
 
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
-  val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+ 
ConfigType.ClientMetrics
-  val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
+  @nowarn("cat=deprecation")
+  val BrokerSupportedConfigTypes = 
JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.toSeq
 :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS

Review Comment:
   Why are we using an iterator here? Can we use `val 
BrokerSupportedConfigTypes: Seq[String] = ConfigType.ALL.asScala :+ 
BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [KAFKA-16015] Fix custom timeouts overwritten by defaults [kafka]

2023-12-21 Thread via GitHub


pprovenzano commented on PR #15030:
URL: https://github.com/apache/kafka/pull/15030#issuecomment-1866442934

   > @pprovenzano , I dont have permissions to merge the PR, so I guess 
somebody else eventually will do it, is that correct? thanks!
   
   I don't have them either. I've asked another to merge and also cherry-pick 
to 3.7
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15456) Add support for OffsetFetch version 9 in consumer

2023-12-21 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15456.
-
Resolution: Fixed

> Add support for OffsetFetch version 9 in consumer
> -
>
> Key: KAFKA-15456
> URL: https://issues.apache.org/jira/browse/KAFKA-15456
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15456: Commit/Fetch error handling improvements and V9 support [kafka]

2023-12-21 Thread via GitHub


dajac merged PR #14557:
URL: https://github.com/apache/kafka/pull/14557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15456: Commit/Fetch error handling improvements and V9 support [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #14557:
URL: https://github.com/apache/kafka/pull/14557#issuecomment-1866401989

   We've got a reasonably good build here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14557/34/tests/.
 Based on this one, it is safe to merge this PR. I will merge it to trunk and 
3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #15059:
URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866369216

   @lucasbru Thanks. I just fixed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


lucasbru commented on PR #15059:
URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866354730

   Please check
   * `GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS`
   * Some comments in `GroupCoordinator`
   * Various names in `GroupCoordinatorMetrics`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16030) new group coordinator should check if partition goes offline during load

2023-12-21 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16030.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> new group coordinator should check if partition goes offline during load
> 
>
> Key: KAFKA-16030
> URL: https://issues.apache.org/jira/browse/KAFKA-16030
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.7.0
>
>
> The new coordinator stops loading if the partition goes offline during load. 
> However, the partition is still considered active. Instead, we should return 
> NOT_LEADER_OR_FOLLOWER exception during load.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16030: new group coordinator should check if partition goes offline during load [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #15043:
URL: https://github.com/apache/kafka/pull/15043#issuecomment-1866342682

   It is hard to clean a clean build here too but failures are unrelated. I 
also verified it locally. I will merge it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16030: new group coordinator should check if partition goes offline during load [kafka]

2023-12-21 Thread via GitHub


dajac merged PR #15043:
URL: https://github.com/apache/kafka/pull/15043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on code in PR #14992:
URL: https://github.com/apache/kafka/pull/14992#discussion_r1434120692


##
build.gradle:
##
@@ -940,6 +940,7 @@ project(':core') {
 implementation libs.jacksonModuleScala
 implementation libs.jacksonDataformatCsv
 implementation libs.jacksonJDK8Datatypes
+implementation libs.jacksonAfterburner

Review Comment:
   Please help me understand why did we add this dependency in clients?



##
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##
@@ -235,18 +235,24 @@ public Object toConnect(final Schema schema, final 
JsonNode value) {
 private final JsonDeserializer deserializer;
 
 public JsonConverter() {
+this(true);
+}
+
+public JsonConverter(boolean enableModules) {
 serializer = new JsonSerializer(
-mkSet(),
-JSON_NODE_FACTORY
+mkSet(),

Review Comment:
   nit
   
   please maintain original indentation



##
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java:
##
@@ -235,18 +235,24 @@ public Object toConnect(final Schema schema, final 
JsonNode value) {
 private final JsonDeserializer deserializer;
 
 public JsonConverter() {
+this(true);
+}
+
+public JsonConverter(boolean enableModules) {

Review Comment:
   please add a javadoc comment specifying that this ctor is visible only for 
benchmarking. we add similar comments when we expose a new ctor/method for 
testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434128032


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
 public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
 public static final String OFFSET_DELETIONS_SENSOR_NAME = 
"OffsetDeletions";
-public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
-public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";
+public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
+public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434112503


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
 public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
 public static final String OFFSET_DELETIONS_SENSOR_NAME = 
"OffsetDeletions";
-public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
-public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";
+public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
+public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";

Review Comment:
   Indeed, we should rename this one. Missed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434097877


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
 public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
 public static final String OFFSET_DELETIONS_SENSOR_NAME = 
"OffsetDeletions";
-public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
-public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";
+public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
+public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = 
"GenericGroupRebalances";

Review Comment:
   I should have added my previous comment over here. Let's continue 
conversation at this place.
   
   I understand that we have only changed the variable name of the sensor here. 
My question is, should we change "GenericGroupRebalances" to 
"ClassicGroupRebalances" here if this has just been introduced in 3.7?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434068651


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -319,40 +319,40 @@ public Long value() {
 registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.PREPARING_REBALANCE);
+return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return 
numGenericGroups(GenericGroupState.COMPLETING_REBALANCE);
+return 
numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.STABLE);
+return numGenericGroups(ClassicGroupState.STABLE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.DEAD);
+return numGenericGroups(ClassicGroupState.DEAD);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge() {

Review Comment:
   Regarding `GENERIC_GROUP_REBALANCES_SENSOR_NAME`, the patch only changes the 
name of the sensor. The metrics are not touched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434063518


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -319,40 +319,40 @@ public Long value() {
 registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.PREPARING_REBALANCE);
+return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return 
numGenericGroups(GenericGroupState.COMPLETING_REBALANCE);
+return 
numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.STABLE);
+return numGenericGroups(ClassicGroupState.STABLE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.DEAD);
+return numGenericGroups(ClassicGroupState.DEAD);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge() {

Review Comment:
   We don't have any existing metrics using "generic". They are all new in 3.7.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434047673


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -319,40 +319,40 @@ public Long value() {
 registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.PREPARING_REBALANCE);
+return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return 
numGenericGroups(GenericGroupState.COMPLETING_REBALANCE);
+return 
numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.STABLE);
+return numGenericGroups(ClassicGroupState.STABLE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.DEAD);
+return numGenericGroups(ClassicGroupState.DEAD);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge() {

Review Comment:
   Are we keeping the metrics name with "generic" keyword though? I think we 
should if these metrics have already been released in previous versions, if 
not, might want to change the metric names to "classic" as well.
   
   e.g. GENERIC_GROUP_REBALANCES_SENSOR_NAME



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on code in PR #15059:
URL: https://github.com/apache/kafka/pull/15059#discussion_r1434047673


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##
@@ -319,40 +319,40 @@ public Long value() {
 registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.PREPARING_REBALANCE);
+return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return 
numGenericGroups(GenericGroupState.COMPLETING_REBALANCE);
+return 
numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.STABLE);
+return numGenericGroups(ClassicGroupState.STABLE);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge() {
 @Override
 public Long value() {
-return numGenericGroups(GenericGroupState.DEAD);
+return numGenericGroups(ClassicGroupState.DEAD);
 }
 });
 
 registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge() {

Review Comment:
   Are we keeping the metrics name with "generic" keyword though? I think we 
should if these metrics have already been released in previous versions, if 
not, might want to change the metric names to "classic" as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799416#comment-17799416
 ] 

Divij Vaidya edited comment on KAFKA-15388 at 12/21/23 12:56 PM:
-

Hey [~enether] 
I will not consider this as a blocker for 3.7.0 since Tiered Storage is not 
production ready in 3.7.0. We will continue to carry this bug (as we did in 
3.6) until it is fixed. If we are able to fix this in timeline for 3.7, well 
and good but if not, we will ship this in 3.8.

This is a blocker for TS production ready status but that will be captured 
somewhere else.


was (Author: divijvaidya):
Hey [~enether] 
I will not consider this as a blocker for 3.7.0 since Tiered Storage is not GA 
in 3.7.0. We will continue to carry this bug (as we did in 3.6) until it is 
fixed. If we are able to fix this in timeline for 3.7, well and good but if 
not, we will ship this in 3.8.

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16040) Rename `Generic` to `Classic`

2023-12-21 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-16040:

Description: People has raised concerned about using {{Generic}} as a name 
to designate the old rebalance protocol. We considered using {{Legacy}} but 
discarded it because there are still applications, such as Connect, using the 
old protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance 
Protocol{}}}.

> Rename `Generic` to `Classic`
> -
>
> Key: KAFKA-16040
> URL: https://issues.apache.org/jira/browse/KAFKA-16040
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.7.0
>
>
> People has raised concerned about using {{Generic}} as a name to designate 
> the old rebalance protocol. We considered using {{Legacy}} but discarded it 
> because there are still applications, such as Connect, using the old 
> protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance 
> Protocol{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15388:
-
Priority: Major  (was: Blocker)

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799416#comment-17799416
 ] 

Divij Vaidya commented on KAFKA-15388:
--

Hey [~enether] 
I will not consider this as a blocker for 3.7.0 since Tiered Storage is not GA 
in 3.7.0. We will continue to carry this bug (as we did in 3.6) until it is 
fixed. If we are able to fix this in timeline for 3.7, well and good but if 
not, we will ship this in 3.8.

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14888: Added remote log segments retention functionality based on time and size. [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1434040959


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);

Review Comment:
   Yes that is correct. Copying functionality is not impacted as discussed in 
https://issues.apache.org/jira/browse/KAFKA-15388. It's only the 
read-from-remote that is impacted for the historically compacted topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]

2023-12-21 Thread via GitHub


dajac opened a new pull request, #15059:
URL: https://github.com/apache/kafka/pull/15059

   People has raised concerned about using `Generic` as a name to designate the 
old rebalance protocol. We considered using `Legacy` but discarded it because 
there are still applications, such as Connect, using the old protocol. We 
settled on using `Classic` for the `Classic Rebalance Protocol`.
   
   The changes in this patch are extremely mechanical. It basically replaces 
the occurrences of `generic` by `classic`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799399#comment-17799399
 ] 

Arpit Goyal edited comment on KAFKA-15388 at 12/21/23 12:50 PM:


[~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working 
on the fix. Hopefully will create a PR by EOD.


was (Author: JIRAUSER301926):
[~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working 
on the fix. Hopefully will create a PR by Saturday.

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13950) Resource leak at multiple places in the code

2023-12-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-13950:
-
Fix Version/s: 3.8.0
   (was: 3.7.0)

> Resource leak at multiple places in the code
> 
>
> Key: KAFKA-13950
> URL: https://issues.apache.org/jira/browse/KAFKA-13950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, kraft, streams, unit tests
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
> Fix For: 3.8.0
>
>
> I ran Amazon CodeGuru reviewer on Apache Kafka's code base and the code tool 
> detected various places where Closable resources are not being closed 
> properly leading to leaks.
> This task will fix the resource leak detected at multiple places.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16040) Rename `Generic` to `Classic`

2023-12-21 Thread David Jacot (Jira)
David Jacot created KAFKA-16040:
---

 Summary: Rename `Generic` to `Classic`
 Key: KAFKA-16040
 URL: https://issues.apache.org/jira/browse/KAFKA-16040
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot
 Fix For: 3.7.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13950: Fix resource leak in error scenarios [kafka]

2023-12-21 Thread via GitHub


divijvaidya merged PR #12228:
URL: https://github.com/apache/kafka/pull/12228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16036) Add `group.coordinator.rebalance.protocols` and publish all new configs

2023-12-21 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16036.
-
Resolution: Fixed

> Add `group.coordinator.rebalance.protocols` and publish all new configs
> ---
>
> Key: KAFKA-16036
> URL: https://issues.apache.org/jira/browse/KAFKA-16036
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16035: add tests for remoteLogSizeComputationTime/remoteFetchExpiresPerSec metrics [kafka]

2023-12-21 Thread via GitHub


showuon commented on PR #15056:
URL: https://github.com/apache/kafka/pull/15056#issuecomment-1866185768

   @satishd @kamalcph @clolov , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]

2023-12-21 Thread via GitHub


dajac merged PR #15053:
URL: https://github.com/apache/kafka/pull/15053


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]

2023-12-21 Thread via GitHub


dajac commented on PR #15053:
URL: https://github.com/apache/kafka/pull/15053#issuecomment-1866183110

   It is impossible to get a clean build at the moment. There are OOM errors 
all over the places, including in trunk. Looking at all the builds, it is clear 
that this PR does not create persistent failures. I also ran all the tests 
locally to confirm. Therefore, I will merge it to trunk and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13950: Fix resource leak in error scenarios [kafka]

2023-12-21 Thread via GitHub


divijvaidya commented on PR #12228:
URL: https://github.com/apache/kafka/pull/12228#issuecomment-1866183063

   Unrelated test failures since they are known to be flaky as per 
https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=trunk=Europe%2FBerlin
 
   ```
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest___2/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.PlaintextConsumerTest.testShrinkingTopicSubscriptions(String, 
String).quorum=kraft+kip848.groupProtocol=consumer](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.api/PlaintextConsumerTest/Build___JDK_11_and_Scala_2_13___testShrinkingTopicSubscriptions_String__String__quorum_kraft_kip848_groupProtocol_consumer/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.api.SaslSslConsumerTest.testCoordinatorFailover(String, 
String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.api/SaslSslConsumerTest/Build___JDK_8_and_Scala_2_12___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_8_and_Scala_2_12___testListenerConnectionRateLimitWhenActualRateAboveLimit__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldHaveSamePositionBoundActiveAndStandBy/)
   [Build / JDK 8 and Scala 2.12 / 

[jira] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Arpit Goyal (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15388 ]


Arpit Goyal deleted comment on KAFKA-15388:
-

was (Author: JIRAUSER301926):
[~showuon] [~satish.duggana] [~christo_lolov] Any suggestion how to fetch the 
higher segment for a particular offset from remote , the same way we do in 
local storage ?

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15241) Compute tiered offset by keeping the respective epochs in scope.

2023-12-21 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15241:
---
Fix Version/s: 3.7.0

> Compute tiered offset by keeping the respective epochs in scope.
> 
>
> Key: KAFKA-15241
> URL: https://issues.apache.org/jira/browse/KAFKA-15241
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
> Fix For: 3.7.0
>
>
> This is a followup on the discussion 
> [thread|https://github.com/apache/kafka/pull/14004#discussion_r1268911909]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-12-21 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799399#comment-17799399
 ] 

Arpit Goyal commented on KAFKA-15388:
-

[~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working 
on the fix. Hopefully will create a PR by Saturday.

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, 
> tieredtopicloglist.png
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] test [kafka]

2023-12-21 Thread via GitHub


dajac opened a new pull request, #15058:
URL: https://github.com/apache/kafka/pull/15058

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Fix Version/s: 3.7.0

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0
>
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.
> The repro can be started with
> {code}
> ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
> spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
> {code}
> The repro writes records into a state store and tries to retrieve them again 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
>  It will throw an {{IllegalStateException}} if it cannot find a record in the 
> state 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
>  Once the offsets in the record collector are cleared on close 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
>  and 
> https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
>  the {{IllegalStateException}} does not occur anymore.
> In the logs you can check for 
> - {{Restore batch end offset is}} which are the restored offsets in the state.
> - {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
> - {{task [0_1] Checkpointable offsets}} which show the offsets coming from 
> the sending records to the changelog topic 
> {{RestoreIntegrationTesttest-stateStore-changelog-1}}
> Always the last instances of these before the {{IllegalStateException}} is 
> thrown.
> You will see that the restored offsets are less than the offsets that are 
> written to the checkpoint. The offsets written to the checkpoint come from 
> the offsets stored when sending the records to the changelog topic.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15158) Add metrics for RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec

2023-12-21 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15158:
---
Labels: tiered-storage  (was: )

> Add metrics for RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, 
> BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec
> --
>
> Key: KAFKA-15158
> URL: https://issues.apache.org/jira/browse/KAFKA-15158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Gantigmaa Selenge
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Add the following metrics for better observability into the RemoteLog related 
> activities inside the broker.
> 1. RemoteWriteRequestsPerSec
> 2. RemoteDeleteRequestsPerSec
> 3. BuildRemoteLogAuxStateRequestsPerSec
>  
> These metrics will be calculated at topic level (we can add them at 
> brokerTopicStats)
> -*RemoteWriteRequestsPerSec* will be marked on every call to 
> RemoteLogManager#-
> -copyLogSegmentsToRemote()- already covered by KAFKA-14953
>  
> *RemoteDeleteRequestsPerSec* will be marked on every call to 
> RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced 
> in [https://github.com/apache/kafka/pull/13561] 
> *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to 
> ReplicaFetcherTierStateMachine#buildRemoteLogAuxState()
>  
> (Note: For all the above, add Error metrics as well such as 
> RemoteDeleteErrorPerSec)
> (Note: This requires a change in KIP-405 and hence, must be approved by KIP 
> author [~satishd] )
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14888: Added remote log segments retention functionality based on time and size. [kafka]

2023-12-21 Thread via GitHub


iit2009060 commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1433944692


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -761,11 +784,385 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (remainingBreachedSize > 0) {
+long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+if (remainingBytes >= 0) {
+remainingBreachedSize = remainingBytes;
+return true;
+}
+}
+
+return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);

Review Comment:
   @divijvaidya  @satishd  @showuon  I gone through the specific code and 
realised this is actually not impacting the logic
   1. While copying the remote segments , remotelogsegmentmetadata stores  
endoffset using value from the nextSegment base offset.
   
https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L693
   2.  In my understanding it will be safe to use same logic for historically 
compacted topics. 
   Let me know  If my  analysis is correct or not ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433900022


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -266,6 +267,28 @@ CompletableFuture deleteOffsets(
 BufferSupplier bufferSupplier
 );
 
+/**
+ * Complete a transaction. This is called when the WriteTxnMarkers API is 
called

Review Comment:
   That makes sense. I also added a check in 
`GroupCoordinatorService.completeTransaction` to fail if another partition is 
received as it is not expected. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433892655


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2432,7 +2477,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = controlRecords,
   requestLocal = requestLocal,
-  responseCallback = maybeSendResponseCallback(producerId, 
marker.transactionResult))
+  responseCallback = errors => {

Review Comment:
   Yes. There are existing unit tests and integrations tests testing this path. 
Note that the tests that I've added also exercise this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433891089


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3018,6 +3020,223 @@ class KafkaApisTest {
   any())
   }
 
+  @Test

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433890364


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -102,6 +241,31 @@ public long append(
 }
 }
 
+@Override
+public long appendEndTransactionMarker(
+TopicPartition tp,
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) throws KafkaException {
+PartitionState state = partitionState(tp);
+state.lock.lock();
+try {
+state.entries.add(new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+));
+state.endOffset += 1;
+if (autoCommit) commit(tp, state.endOffset);

Review Comment:
   It means that we advance the HWM immediately and trigger the 
`onHighWatermarkUpdated` callback if enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433889262


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java:
##
@@ -33,10 +37,141 @@
  */
 public class InMemoryPartitionWriter implements PartitionWriter {
 
+public static class LogEntry {
+public static  LogEntry value(T value) {
+return new LogValue<>(value);
+}
+
+public static  LogEntry value(
+long producerId,
+short producerEpoch,
+T value
+) {
+return new LogValue<>(
+producerId,
+producerEpoch,
+value
+);
+}
+
+public static LogEntry control(
+long producerId,
+short producerEpoch,
+int coordinatorEpoch,
+TransactionResult result
+) {
+return new LogControl(
+producerId,
+producerEpoch,
+coordinatorEpoch,
+result
+);
+}
+}
+
+public static class LogValue extends LogEntry {
+public final long producerId;
+public final short producerEpoch;
+public final T value;
+
+private LogValue(
+long producerId,
+short producerEpoch,
+T value
+) {
+this.producerId = producerId;
+this.producerEpoch = producerEpoch;
+this.value = value;
+}
+
+private LogValue(T value) {
+this(
+RecordBatch.NO_PRODUCER_ID,
+RecordBatch.NO_PRODUCER_EPOCH,
+value
+);
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433888169


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433886514


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+
+// Commit completion #1.
+

Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]

2023-12-21 Thread via GitHub


dajac commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433877427


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() {
 );
 }
 
+@ParameterizedTest
+@EnumSource(value = TransactionResult.class)
+public void testScheduleTransactionCompletion(TransactionResult result) 
throws ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+// Transactional write #1.
+CompletableFuture write1 = 
runtime.scheduleTransactionalWriteOperation(
+"write#1",
+TP,
+"transactional-id",
+100L,
+(short) 5,
+DEFAULT_WRITE_TIMEOUT,
+state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+);
+
+// Verify that the write is not committed yet.
+assertFalse(write1.isDone());
+
+// The last written offset is updated.
+assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator. They are stored in
+// the pending set for now.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
+100L
+));
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2")
+), writer.entries(TP));
+
+// Complete transaction #1.
+CompletableFuture complete1 = 
runtime.scheduleTransactionCompletion(
+"complete#1",
+TP,
+100L,
+(short) 5,
+10,
+result,
+DEFAULT_WRITE_TIMEOUT
+);
+
+// Verify that the completion is not committed yet.
+assertFalse(complete1.isDone());
+
+// The last written offset is updated.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+// The last committed offset does not change.
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+// A new snapshot is created.
+assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+// Records have been replayed to the coordinator.
+if (result == TransactionResult.COMMIT) {
+// They are now in the records set if committed.
+assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
+} else {
+// Or they are gone if aborted.
+assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+}
+
+// Records have been written to the log.
+assertEquals(Arrays.asList(
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"),
+InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"),
+InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, 
result)
+), writer.entries(TP));
+
+// Commit write #1.
+writer.commit(TP, 2);
+
+// The write is completed.
+assertTrue(write1.isDone());
+assertEquals("response1", write1.get(5, TimeUnit.SECONDS));

Review Comment:
   Yes, that's correct. The move from the 

  1   2   >