[GitHub] [kafka] showuon opened a new pull request, #13037: MINOR: make sure all partition info is propagated

2022-12-21 Thread GitBox


showuon opened a new pull request, #13037:
URL: https://github.com/apache/kafka/pull/13037

   Recently, we have some flaky tests failed with:
   ```
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at kafka.utils.TestUtils$.describeTopic(TestUtils.scala:475)
at 
kafka.utils.TestUtils$.topicHasSameNumPartitionsAndReplicationFactor(TestUtils.scala:484)
   ```
   The line failed is when we tried to create a topic but got 
`TopicExistsException`, so we want to make sure this topic has the expected 
partition num and replication factor. It failed while trying to describe this 
topic, and got `UnknownTopicOrPartitionException`. That will happen when topic 
is created but the metadata haven't propagated to all brokers. So, fix it by 
explicitly waiting for partition info propagated to all brokers before 
verifying `topicHasSameNumPartitionsAndReplicationFactor`. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log

2022-12-21 Thread GitBox


chia7712 commented on code in PR #12979:
URL: https://github.com/apache/kafka/pull/12979#discussion_r1055146924


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File],
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
+  // the metrics tags still contain "future", so we have to remove it.
+  // we will add metrics back after sourceLog remove the metrics
+  destLog.removeLogMetrics()
   destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
   destLog.updateHighWatermark(sourceLog.highWatermark)

Review Comment:
   >  it'll be helpful to have metrics reporting it
   
   make sense. will copy that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14470) Move log layer to storage module

2022-12-21 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651098#comment-17651098
 ] 

Satish Duggana edited comment on KAFKA-14470 at 12/22/22 6:54 AM:
--

[~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module 
is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. 
Glad to raise PR against https://issues.apache.org/jira/browse/KAFKA-14543 for 
that specific change if you have not yet started working on that.


was (Author: satish.duggana):
[~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module 
is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. Let 
me know if I can raise PR against 
https://issues.apache.org/jira/browse/KAFKA-14543 for that specific change.

> Move log layer to storage module
> 
>
> Key: KAFKA-14470
> URL: https://issues.apache.org/jira/browse/KAFKA-14470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We introduced the `storage` module as part of KIP-405, but the existing log 
> layer remains in the `core` module. Moving the log layer to the `storage` 
> module would be another step towards improved modularity and build times 
> (similar to `metadata`, `raft` and `group-coordinator`).
> We should do this in an incremental manner to make the code review process 
> easier. I will create separate tasks, each one mapping to one pull request. 
> In order to understand the feasibility, I tackled a few of the tasks myself.
> Help from the community is appreciated for the unassigned tasks, but it 
> probably makes sense to do that after the initial PRs have been submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14470) Move log layer to storage module

2022-12-21 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651098#comment-17651098
 ] 

Satish Duggana commented on KAFKA-14470:


[~ijuma] [~mimaison] `LogOffsetMetadata` refactoring to move to storage module 
is being done as part of https://issues.apache.org/jira/browse/KAFKA-14480. Let 
me know if I can raise PR against 
https://issues.apache.org/jira/browse/KAFKA-14543 for that specific change.

> Move log layer to storage module
> 
>
> Key: KAFKA-14470
> URL: https://issues.apache.org/jira/browse/KAFKA-14470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We introduced the `storage` module as part of KIP-405, but the existing log 
> layer remains in the `core` module. Moving the log layer to the `storage` 
> module would be another step towards improved modularity and build times 
> (similar to `metadata`, `raft` and `group-coordinator`).
> We should do this in an incremental manner to make the code review process 
> easier. I will create separate tasks, each one mapping to one pull request. 
> In order to understand the feasibility, I tackled a few of the tasks myself.
> Help from the community is appreciated for the unassigned tasks, but it 
> probably makes sense to do that after the initial PRs have been submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] emilnkrastev commented on pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…

2022-12-21 Thread GitBox


emilnkrastev commented on PR #11818:
URL: https://github.com/apache/kafka/pull/11818#issuecomment-1362465204

   @gharris1727 The PR is updated. Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14470) Move log layer to storage module

2022-12-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650947#comment-17650947
 ] 

Ismael Juma edited comment on KAFKA-14470 at 12/22/22 6:03 AM:
---

[~mimaison] Dependencies of the log layer classes would have to be moved too, 
yes. In some cases, it may warrant a separate ticket while in others it can be 
bundled with one of the tickets that already exists (left to the discretion of 
the contributor).

Regarding kraft, yes there are some opportunities once the log layer has been 
completely moved to the storage layer. I was having a discussion with Jose 
yesterday about this and will file a ticket to capture it.


was (Author: ijuma):
[~mimaison] Dependencies of the log layer classes would have to be moved too, 
yes. In some cases, it may warrant a separate ticket while in others it can be 
bundled with one of the tickets that already exists (left to the discretion of 
the contributor).

Regarding kraft, yes there are some opportunities once the log layer has been 
completed moved to the storage layer. I was having a discussion with Jose 
yesterday about this and will file a ticket to capture it.

> Move log layer to storage module
> 
>
> Key: KAFKA-14470
> URL: https://issues.apache.org/jira/browse/KAFKA-14470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We introduced the `storage` module as part of KIP-405, but the existing log 
> layer remains in the `core` module. Moving the log layer to the `storage` 
> module would be another step towards improved modularity and build times 
> (similar to `metadata`, `raft` and `group-coordinator`).
> We should do this in an incremental manner to make the code review process 
> easier. I will create separate tasks, each one mapping to one pull request. 
> In order to understand the feasibility, I tackled a few of the tasks myself.
> Help from the community is appreciated for the unassigned tasks, but it 
> probably makes sense to do that after the initial PRs have been submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] finaut commented on pull request #8066: KAFKA-4090: Validate SSL connection in client

2022-12-21 Thread GitBox


finaut commented on PR #8066:
URL: https://github.com/apache/kafka/pull/8066#issuecomment-1362433187

   > I documented my work on the ticket as well. It got very little attention 
from the Kafka Dev Team and seems to be pretty out of date now (merge 
conflicts). Will need someone internal to the project to take this baton and 
run with it.
   > 
   > https://issues.apache.org/jira/browse/KAFKA-4090
   
   It's hard to understand why such a serious error is being ignored. It's been 
reported many times in different forms. Without a fix for this, I can't see how 
it's possible to consider Kafka as Enterprise ready technology.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest

2022-12-21 Thread GitBox


showuon commented on code in PR #13036:
URL: https://github.com/apache/kafka/pull/13036#discussion_r1055097820


##
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala:
##
@@ -128,10 +128,13 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"2", "2", willBeCommitted = false))
 producer.flush()
 
+TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for 
topic1 did not propagate quickly")
+
 // Ensure producer IDs are added.
 val pState = producerState
-assertEquals(1, pState.size)
-val oldProducerId = pState(0).producerId
+assertEquals(1, pState.size, s"No producer visible via admin api")

Review Comment:
   I don't think the error message makes sense here since we already wait until 
the `producerState` is not empty. The error here could be the size is 
different, not no producer visible via admin api. Does that make sense?



##
core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala:
##
@@ -149,13 +152,18 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"3", "3", willBeCommitted = true))
 producer.commitTransaction()
 
+TestUtils.waitUntilTrue(() => producerState.nonEmpty, s"Producer IDs for 
topic1 did not propagate quickly")
+
 // Producer IDs should repopulate.
 val pState2 = producerState
-assertEquals(1, pState2.size)
-val newProducerId = pState2(0).producerId
+assertEquals(1, pState2.size, "No producer visible via admin api")
+val newProducerId = pState2.head.producerId
+val newProducerEpoch = pState2.head.producerEpoch
 
-// Producer IDs should be the same.
+// Because the transaction IDs outlive the producer IDs, creating a 
producer with the same transactional id
+// soon after the first will re-use the same producerId, while bumping the 
epoch to indicate that they are distinct.
 assertEquals(oldProducerId, newProducerId)
+assertEquals(oldProducerEpoch + 1, newProducerEpoch)

Review Comment:
   Thanks for adding one more verification and the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log

2022-12-21 Thread GitBox


showuon commented on code in PR #12979:
URL: https://github.com/apache/kafka/pull/12979#discussion_r1055056767


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File],
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
+  // the metrics tags still contain "future", so we have to remove it.
+  // we will add metrics back after sourceLog remove the metrics
+  destLog.removeLogMetrics()
   destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
   destLog.updateHighWatermark(sourceLog.highWatermark)

Review Comment:
   Do you think we should remove log metrics after destLog got renamed? I'm 
thinking if we got stuck for some time during `renameDir` and it'll be helpful 
to have metrics reporting it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log

2022-12-21 Thread GitBox


showuon commented on code in PR #12979:
URL: https://github.com/apache/kafka/pull/12979#discussion_r1055056767


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1079,6 +1079,9 @@ class LogManager(logDirs: Seq[File],
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
+  // the metrics tags still contain "future", so we have to remove it.
+  // we will add metrics back after sourceLog remove the metrics
+  destLog.removeLogMetrics()
   destLog.renameDir(UnifiedLog.logDirName(topicPartition), true)
   destLog.updateHighWatermark(sourceLog.highWatermark)

Review Comment:
   Do you think we should remove log metrics after destLog got renamed? I'm 
thinking if we'll get stuck for some time during `renameDir` and it'll be 
helpful to have metrics reporting it. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13031: MINOR: increase connectionMaxIdleMs to make test reliable

2022-12-21 Thread GitBox


showuon merged PR #13031:
URL: https://github.com/apache/kafka/pull/13031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


ijuma merged PR #13012:
URL: https://github.com/apache/kafka/pull/13012


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362265880

   JDK 17 build passed, other build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #13034: MINOR: remove onChange call in stream assignor assign method

2022-12-21 Thread GitBox


vvcephei merged PR #13034:
URL: https://github.com/apache/kafka/pull/13034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13034: MINOR: remove onChange call in stream assignor assign method

2022-12-21 Thread GitBox


vvcephei commented on PR #13034:
URL: https://github.com/apache/kafka/pull/13034#issuecomment-1362253175

   None of the failing tests were from Streams:
   
   ```
   
   
   Build  / JDK 11 and Scala 2.13 /  
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication()
 | 2 min 11 sec | 1
   -- | -- | --
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft
 | 10 sec | 1
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
 | 4.5 sec | 1
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testCloseOldestConnection() | 
0.59 sec | 1
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String).quorum=kraft
 | 16 sec | 1
   Build / JDK 17 and Scala 2.13 / 
kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk
 | 5.4 sec | 1
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String).quorum=kraft
   
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
  2 min 11 sec
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/TransactionsExpirationTest/Build___JDK_11_and_Scala_2_13___testTransactionAfterProducerIdExpires_String__quorum_kraft/)
   10 sec  
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_11_and_Scala_2_13___testTrustStoreAlter_String__quorum_kraft/)
 4.5 sec 
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.Tls12SelectorTest.testCloseOldestConnection()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/org.apache.kafka.common.network/Tls12SelectorTest/Build___JDK_17_and_Scala_2_13___testCloseOldestConnection__/)
   0.59 sec
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 17 and Scala 2.13 / 
kafka.admin.ReassignPartitionsIntegrationTest.testCancellation(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.admin/ReassignPartitionsIntegrationTest/Build___JDK_17_and_Scala_2_13___testCancellation_String__quorum_kraft/)
   16 sec  
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 17 and Scala 2.13 / 
kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/ProducerIdExpirationTest/Build___JDK_17_and_Scala_2_13___testProducerIdExpirationWithNoTransactions_String__quorum_zk/)
   5.4 sec 
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/)
    [Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13034/1/testReport/kafka.api/TransactionsTest/Build___JDK_17_and_Scala_2_13___testSendOffsetsWithGroupId_String__quorum_kraft/)
   ```
   
   I'll go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13027: MINOR Send ZK broker epoch in registration

2022-12-21 Thread GitBox


mumrah commented on code in PR #13027:
URL: https://github.com/apache/kafka/pull/13027#discussion_r1054964559


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -359,6 +354,11 @@ class KafkaServer(
 val brokerInfo = createBrokerInfo
 val brokerEpoch = zkClient.registerBroker(brokerInfo)
 
+lifecycleManager = new BrokerLifecycleManager(config,
+  time,
+  threadNamePrefix,
+  zkBrokerEpoch = Some(brokerEpoch))

Review Comment:
   Good catch. We do get new broker epochs after Zk session re-initializes. 
I'll change this to use the same pattern we use in AlterPartitionManager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14546) Allow Partitioner to return -1 to indicate default partitioning

2022-12-21 Thread James Olsen (Jira)
James Olsen created KAFKA-14546:
---

 Summary: Allow Partitioner to return -1 to indicate default 
partitioning
 Key: KAFKA-14546
 URL: https://issues.apache.org/jira/browse/KAFKA-14546
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 3.3.1
Reporter: James Olsen


Prior to KIP-794 it was possible to create a custom Partitioner that could 
delegate to the DefaultPartitioner.  DefaultPartitioner has been deprecated so 
we can now only delegate to BuiltInPartitioner.partitionForKey which does not 
handle a non-keyed message.  Hence there is now no mechanism for a custom 
Partitioner to fallback to default partitioning, e.g. for the non-keyed sticky 
case.

I would like to propose that KafkaProducer.partition(...) not throw 
IllegalArgumentException if the Partitioner returns 
RecordMetadata.UNKNOWN_PARTITION and instead continue with the default 
behaviour.  Maybe with a configuration flag to enable this behaviour so as not 
to break existing expectations?

Why was Partitioner delegation with default fallback useful?
 # A single Producer can be used to write to multiple Topics where each Topic 
may have different partitioning requirements.  The Producer can only have a 
single Partitioner so the Partitioner needs to be able to switch behaviour 
based on the Topic, including the need to fallback to default behaviour if a 
given Topic does not have a custom requirement.
 # Multiple services may need to produce to the same Topic and these services 
may be authored by different teams.  A single custom Partitioner that 
encapsulates all Topic specific partitioning logic can be used by all teams at 
all times for all Topics ensuring that mistakes are not made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akhileshchg commented on a diff in pull request #13028: KAFKA-14458: Introduce RPC support during ZK migration

2022-12-21 Thread GitBox


akhileshchg commented on code in PR #13028:
URL: https://github.com/apache/kafka/pull/13028#discussion_r1054933363


##
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java:
##
@@ -34,9 +34,15 @@ public final class ClusterImage {
 public static final ClusterImage EMPTY = new 
ClusterImage(Collections.emptyMap());
 
 private final Map brokers;
+private final Map zkBrokers;
 
 public ClusterImage(Map brokers) {
 this.brokers = Collections.unmodifiableMap(brokers);
+this.zkBrokers = Collections.unmodifiableMap(brokers

Review Comment:
   I thought about this. Have to not strong preference since methods based on 
zkBrokers should not be called often. Will change.



##
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##
@@ -359,13 +400,15 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   throw new IllegalStateException("Controller to broker state change 
requests batch is not empty while creating a " +
 s"new one. Some UpdateMetadata state changes to brokers 
$updateMetadataRequestBrokerSet with partition info " +
 s"$updateMetadataRequestPartitionInfoMap might be lost ")
+metadataInstance = metadataProvider()

Review Comment:
   Yes.



##
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##
@@ -206,6 +206,9 @@ public void replay(ApiMessage record) {
  * updating the highest offset and epoch.
  */
 break;
+case ZK_MIGRATION_STATE_RECORD:
+// TODO handle this
+break;

Review Comment:
   Spill over. Will temove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse

2022-12-21 Thread GitBox


cmccabe merged PR #13029:
URL: https://github.com/apache/kafka/pull/13029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2022-12-21 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1054893924


##
clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpoch.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FencedMemberEpoch extends ApiException {

Review Comment:
   I feel like we typically do that for classes that extend ApiException. (Same 
on other classes changed in this file.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2022-12-21 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1054893630


##
clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpoch.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FencedMemberEpoch extends ApiException {

Review Comment:
   Do we want to add the word Exception to the end of the class name and file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #12901: KAFKA-14367; Add `TnxOffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on PR #12901:
URL: https://github.com/apache/kafka/pull/12901#issuecomment-1362182388

   reminder to rebase here :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054880470


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java:
##
@@ -227,6 +227,24 @@ public boolean requireStable() {
 return data.requireStable();
 }
 
+public List groups() {
+if (version() >= 8) {
+return data.groups();
+} else {
+OffsetFetchRequestData.OffsetFetchRequestGroup group =
+new 
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
+
+data.topics().forEach(topic -> {

Review Comment:
   I feel like the streams + collection to list is extraneous and I'm not sure 
it's much more intuitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054880470


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java:
##
@@ -227,6 +227,24 @@ public boolean requireStable() {
 return data.requireStable();
 }
 
+public List groups() {
+if (version() >= 8) {
+return data.groups();
+} else {
+OffsetFetchRequestData.OffsetFetchRequestGroup group =
+new 
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
+
+data.topics().forEach(topic -> {

Review Comment:
   Is this doing the same thing? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2022-12-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651012#comment-17651012
 ] 

Jun Rao commented on KAFKA-9087:


[~chia7712] : Thanks for the update. About the race condition. I am still 
wondering how we got into that state. Let's say ReplicaAlterLogDirsThread is in 
the about to append the fetched data to a future log when the log's dir is 
being changed. In this case, we will first remove the partition from the 
partitionState in ReplicaAlterLogDirsThread, recreate the future log and add 
the partition to ReplicaAlterLogDirsThread again. If ReplicaAlterLogDirsThread 
tries to append an old fetched data, it should fail the following test since 
the fetch offset in the fetch request and the currentFetchState will be 
different.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L347]

 

So, ReplicaAlterLogDirsThread is supposed to ignore the old fetched data and 
fetch again using the new fetch offset. I am wondering why that didn't happen.

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 

[jira] [Commented] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions

2022-12-21 Thread Chris Solidum (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651011#comment-17651011
 ] 

Chris Solidum commented on KAFKA-14545:
---

Adding a check for null values in [MirrorCheckpointTask.checkpointsForGroups 
|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L172]seems
 like the simplest fix for this issue.

> MirrorCheckpointTask throws NullPointerException when group hasn't consumed 
> from some partitions
> 
>
> Key: KAFKA-14545
> URL: https://issues.apache.org/jira/browse/KAFKA-14545
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.3.0
>Reporter: Chris Solidum
>Assignee: Chris Solidum
>Priority: Major
>
> MirrorTaskConnector looks like it's throwing a NullPointerException when a 
> consumer group hasn't consumed from all topics from a partition. This blocks 
> the syncing of consumer group offsets to the target cluster. The stacktrace 
> and error message is as follows:
> {code:java}
> WARN Failure polling consumer state for checkpoints. 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask)
> at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
> at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at 
> java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
> at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
> at 
> org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
> java.lang.NullPointerException
>  {code}
> This seems to happen if the OffsetFetch call returns a 
> OffsetFetchPartitionResponsePartition with a negative commitedOffset. 
> Mirrormaker should handle this case more gracefully and still be sync over 
> consumer offsets for non negative partitions.
> {code:java}
> TRACE [AdminClient clientId=adminclient-55] 
> Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, 
> tries=0, nextAllowedTryMs=0) got response 
> OffsetFetchResponseData(throttleTimeMs=0, 
> topics=[OffsetFetchResponseTopic(name='XXX', 
> partitions=[OffsetFetchResponsePartition(partitionIndex=1, 
> committedOffset=866, committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, 
> committedLeaderEpoch=-1, metadata='', errorCode=0), 
> OffsetFetchResponsePartition(partitionIndex=11, 

[jira] [Updated] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions

2022-12-21 Thread Chris Solidum (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Solidum updated KAFKA-14545:
--
Description: 
MirrorTaskConnector looks like it's throwing a NullPointerException when a 
consumer group hasn't consumed from all topics from a partition. This blocks 
the syncing of consumer group offsets to the target cluster. The stacktrace and 
error message is as follows:
{code:java}
WARN Failure polling consumer state for checkpoints. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask)
at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
java.lang.NullPointerException
 {code}
This seems to happen if the OffsetFetch call returns a 
OffsetFetchPartitionResponsePartition with a negative commitedOffset. 
Mirrormaker should handle this case more gracefully and still be sync over 
consumer offsets for non negative partitions.
{code:java}
TRACE [AdminClient clientId=adminclient-55] 
Call(callName=offsetFetch(api=OFFSET_FETCH), deadlineMs=1671657869539, tries=0, 
nextAllowedTryMs=0) got response OffsetFetchResponseData(throttleTimeMs=0, 
topics=[OffsetFetchResponseTopic(name='XXX', 
partitions=[OffsetFetchResponsePartition(partitionIndex=1, committedOffset=866, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=0, committedOffset=865, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=9, committedOffset=868, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=14, committedOffset=870, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=5, committedOffset=803, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=8, committedOffset=881, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=11, committedOffset=-1, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=4, committedOffset=872, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=7, committedOffset=863, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=10, committedOffset=835, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=13, committedOffset=860, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=12, committedOffset=885, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=3, committedOffset=771, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=6, committedOffset=859, 
committedLeaderEpoch=-1, metadata='', errorCode=0), 
OffsetFetchResponsePartition(partitionIndex=2, 

[jira] [Created] (KAFKA-14545) MirrorCheckpointTask throws NullPointerException when group hasn't consumed from some partitions

2022-12-21 Thread Chris Solidum (Jira)
Chris Solidum created KAFKA-14545:
-

 Summary: MirrorCheckpointTask throws NullPointerException when 
group hasn't consumed from some partitions
 Key: KAFKA-14545
 URL: https://issues.apache.org/jira/browse/KAFKA-14545
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.3.0
Reporter: Chris Solidum
Assignee: Chris Solidum


MirrorTaskConnector looks like it's throwing a NullPointerException when a 
consumer group hasn't consumed from all topics from a partition. This blocks 
the syncing of consumer group offsets to the target cluster. The stacktrace and 
error message is as follows:
{code:java}
WARN Failure polling consumer state for checkpoints. 
(org.apache.kafka.connect.mirror.MirrorCheckpointTask)
at java.base/java.lang.Thread.run(Thread.java:829)Dec 20
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:346)
at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:452)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.poll(MirrorCheckpointTask.java:142)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.sourceRecordsForGroup(MirrorCheckpointTask.java:160)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpointsForGroup(MirrorCheckpointTask.java:177)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.lambda$checkpointsForGroup$2(MirrorCheckpointTask.java:174)
at 
org.apache.kafka.connect.mirror.MirrorCheckpointTask.checkpoint(MirrorCheckpointTask.java:191)
java.lang.NullPointerException
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest

2022-12-21 Thread GitBox


gharris1727 commented on PR #13036:
URL: https://github.com/apache/kafka/pull/13036#issuecomment-1362127508

   @jolshan Could you take a look at this test stabilization?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13036: KAFKA-14534: Reduce flakiness in TransactionsExpirationTest

2022-12-21 Thread GitBox


gharris1727 opened a new pull request, #13036:
URL: https://github.com/apache/kafka/pull/13036

   This test asserts that after a producerId expires and before a transactionId 
expires, the producerId is reused in a subsequent epoch.
   The transactionId expiration time used in this test was too short, and a 
race condition between the two expirations was occasionally causing a new 
producerId to be returned without an epoch bump.
   
   Signed-off-by: Greg Harris 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


philipnee commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054847935


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java:
##
@@ -227,6 +227,24 @@ public boolean requireStable() {
 return data.requireStable();
 }
 
+public List groups() {
+if (version() >= 8) {
+return data.groups();
+} else {

Review Comment:
   The else block is not needed.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
* Handle an offset fetch request
*/
-  def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetFetchRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val version = request.header.apiVersion
 if (version == 0) {
-  // reading offsets from ZK
-  handleOffsetFetchRequestV0(request)
-} else if (version >= 1 && version <= 7) {
-  // reading offsets from Kafka
-  handleOffsetFetchRequestBetweenV1AndV7(request)
+  handleOffsetFetchRequestFromZookeeper(request)

Review Comment:
   i think it can be more idiomatic and cleaner if we rewrite it using match - 
case 
   ```
   version match {
 case 0: handle...
 case  ...
   }
   ```



##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java:
##
@@ -227,6 +227,24 @@ public boolean requireStable() {
 return data.requireStable();
 }
 
+public List groups() {
+if (version() >= 8) {
+return data.groups();
+} else {
+OffsetFetchRequestData.OffsetFetchRequestGroup group =
+new 
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
+
+data.topics().forEach(topic -> {

Review Comment:
   Do you think this can be a bit more intuitive?
   ```
   group.topics.addAll(
 data.topics().stream().map(new Offset...).collect(Collections.toList())
   )
   ```



##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
 this.error = null;
 }
 
+public OffsetFetchResponse(List groups, short 
version) {
+super(ApiKeys.OFFSET_FETCH);
+data = new OffsetFetchResponseData();
+
+if (version >= 8) {
+data.setGroups(groups);
+error = null;
+
+for (OffsetFetchResponseGroup group : data.groups()) {
+this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+}

Review Comment:
   we could just return here to avoid the else. also save 1 indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-12-21 Thread GitBox


philipnee commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1054670620


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -1933,11 +1943,79 @@ private Map 
topicPartitionTags(TopicPartition tp) {
 }
 }
 
+// Visible for testing
+void maybeCloseFetchSessions(final Timer timer) {
+final Cluster cluster = metadata.fetch();
+final List> requestFutures = new 
ArrayList<>();
+for (final Map.Entry entry : 
sessionHandlers.entrySet()) {
+final FetchSessionHandler sessionHandler = entry.getValue();
+// set the session handler to notify close. This will set the next 
metadata request to send close message.
+sessionHandler.notifyClose();
+
+final int sessionId = sessionHandler.sessionId();
+final Integer fetchTargetNodeId = entry.getKey();
+// FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+// skip sending the close request.
+final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+continue;
+}
+
+final RequestFuture responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+responseFuture.addListener(new 
RequestFutureListener() {
+@Override
+public void onSuccess(ClientResponse value) {
+log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);

Review Comment:
   do we really need to log all of the successful close? I think logging the 
failure ones could be sufficient.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2403,17 +2404,40 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
 return clusterResourceListeners;
 }
 
-private void close(long timeoutMs, boolean swallowException) {
+private void close(Duration timeout, boolean swallowException) {
 log.trace("Closing the Kafka consumer");
 AtomicReference firstException = new AtomicReference<>();
-try {
-if (coordinator != null)
-coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-} catch (Throwable t) {
-firstException.compareAndSet(null, t);
-log.error("Failed to close coordinator", t);
+
+final Timer closeTimer = (time == null) ? new 
SystemTime().timer(Math.min(timeout.toMillis(), requestTimeoutMs)) : 
time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+// Close objects with a timeout. The timeout is required because 
fetcher makes request to the server in the
+// process of closing which may not respect the overall timeout 
defined for closing the consumer.
+if (coordinator != null) {
+try {
+coordinator.close(closeTimer);

Review Comment:
   note: I think it also tries blocking commits the offset during the close.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -446,6 +429,33 @@ private RequestFuture 
sendMetadataRequest(MetadataRequest.Builde
 return client.send(node, request);
 }
 
+/**
+ * Send Fetch Request to Kafka cluster asynchronously.
+ *
+ * This method is visible for testing.
+ *
+ * @return A future that indicates result of sent Fetch request
+ */
+RequestFuture sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,

Review Comment:
   can this be private?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -1933,11 +1943,79 @@ private Map 
topicPartitionTags(TopicPartition tp) {
 }
 }
 
+// Visible for testing
+void maybeCloseFetchSessions(final Timer timer) {
+final Cluster cluster = metadata.fetch();
+final List> requestFutures = new 
ArrayList<>();
+for (final Map.Entry entry : 
sessionHandlers.entrySet()) {
+final FetchSessionHandler sessionHandler = entry.getValue();
+// set the session handler to notify close. This will set the next 
metadata request to send close message.
+sessionHandler.notifyClose();
+
+final int sessionId = sessionHandler.sessionId();
+final Integer fetchTargetNodeId = entry.getKey();
+// FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+// skip sending the close request.
+final Node 

[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054846959


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, 
baseOffset: Long): LogSegment = {
 LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): 
LogValidator.MetricsRecorder = {
+new LogValidator.MetricsRecorder {
+  def recordInvalidMagic(): Unit =
+allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+  def recordInvalidOffset(): Unit =

Review Comment:
   Sounds good. We can keep this as it is then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14271) Topic recreation fails in KRaft mode when topic contains collidable characters

2022-12-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14271:
--
Fix Version/s: (was: 3.4.0)
   (was: 3.3.2)

> Topic recreation fails in KRaft mode when topic contains collidable characters
> --
>
> Key: KAFKA-14271
> URL: https://issues.apache.org/jira/browse/KAFKA-14271
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.0
>Reporter: Jeffrey Tolar
>Priority: Major
> Attachments: topic.with.dots.log, topicwithoutdots.log
>
>
> We recently updated one of our clusters from 3.2.0 to 3.3.0 (primarily to get 
> the fix for KAFKA-13909). This cluster is running KRaft mode.
> This is a cluster used for some integration tests - each test deletes the 
> topics it uses before the test to ensure a clean slate for the test; the 
> brokers get restarted in-between tests, but the broker data isn't deleted.
> With 3.3.0, this semi-crashes Kafka. The brokers stay running, but the topic 
> creation fails:
> {noformat}
> [2022-09-30 17:17:59,216] WARN [Controller 1] createTopics: failed with 
> unknown server exception NoSuchElementException at epoch 1 in 601 us.  
> Renouncing leadership and reverting to the last committed offset 18. 
> (org.apache.kafka.controller.QuorumController)
> java.util.NoSuchElementException
> at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:167)
> at 
> org.apache.kafka.timeline.SnapshottableHashTable$CurrentIterator.next(SnapshottableHashTable.java:139)
> at 
> org.apache.kafka.timeline.TimelineHashSet$ValueIterator.next(TimelineHashSet.java:120)
> at 
> org.apache.kafka.controller.ReplicationControlManager.validateNewTopicNames(ReplicationControlManager.java:799)
> at 
> org.apache.kafka.controller.ReplicationControlManager.createTopics(ReplicationControlManager.java:567)
> at 
> org.apache.kafka.controller.QuorumController.lambda$createTopics$7(QuorumController.java:1832)
> at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:767)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
> at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}
> This appears to be because our topic names contain {{.}}'s. Here's a quick 
> reproducer script:
> {noformat}
> #!/bin/bash
> VERSION=3.3.0
> TOPIC=$1
> set -x
> rm -rf -- kafka_2.13-${VERSION} kafka_2.13-${VERSION}.tgz 
> /tmp/kraft-combined-logs
> trap 'kill -- "-$$" && wait' EXIT
> curl -O https://dlcdn.apache.org/kafka/$VERSION/kafka_2.13-${VERSION}.tgz
> tar -xzf kafka_2.13-${VERSION}.tgz
> cd kafka_2.13-${VERSION}
> id=$(./bin/kafka-storage.sh random-uuid)
> ./bin/kafka-storage.sh format -t $id -c ./config/kraft/server.properties
> ./bin/kafka-server-start.sh config/kraft/server.properties > broker.log 2>&1 &
> sleep 1
> ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> ./bin/kafka-topics.sh --delete --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> ./bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server 
> localhost:9092
> sleep 1
> {noformat}
> Running {{./test-kafka.sh topic.with.dots}} exhibits the failure; using 
> {{topicwithoutdots}} works as expected.
> I'll attach the broker logs from each run.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054835003


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest

Review Comment:
   Ah I missed this. Thanks for sharing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054834094


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##
@@ -52,6 +54,7 @@ CompletableFuture joinGroup(
 );
 
 /**
+<<< HEAD

Review Comment:
   nit: remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054833397


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
* Handle an offset commit request
*/
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
+  def handleOffsetCommitRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
-  }
-}
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
-}
-
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-}
-  }
-  sendResponseCallback(errorMap.toMap)
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else {
-  val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  val responseBuilder = new OffsetCommitResponse.Builder()
+  val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+if (!authorizedTopics.contains(topic.name)) {
+  // If the topic is not authorized, we add the topic and all its 
partitions
+  // to the response with TOPIC_AUTHORIZATION_FAILED.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+} else if (!metadataCache.contains(topic.name)) {
+  // If the topic is unknown, we add the topic and all its partitions
+  // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+} else {
+  // Otherwise, we check all partitions to ensure that they all 

[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054830892


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
 }
 CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+context: RequestContext,
+request: OffsetCommitRequestData,
+bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+val future = new CompletableFuture[OffsetCommitResponseData]()
+
+def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+  val response = new OffsetCommitResponseData()
+  val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  commitStatus.forKeyValue { (tp, error) =>
+var topic = byTopics(tp.topic)
+if (topic == null) {
+  topic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+  byTopics += tp.topic -> topic
+  response.topics.add(topic)
+}
+topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  }
+
+  future.complete(response)
+}
+
+// "default" expiration timestamp is now + retention (and retention may be 
overridden if v2)
+// expire timestamp is computed differently for v1 and v2.
+//   - If v1 and no explicit commit timestamp is provided we treat it the 
same as v5.
+//   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that

Review Comment:
   I realize this comment was copy-pasted, but we can clean it up I think :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


ijuma commented on PR #13012:
URL: https://github.com/apache/kafka/pull/13012#issuecomment-1362078506

   @junrao Pushed a change that addresses two of the review comments. I left a 
comment for the other review comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054822770


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
* Handle an offset commit request
*/
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
+  def handleOffsetCommitRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
-  }
-}
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
-}
-
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-}
-  }
-  sendResponseCallback(errorMap.toMap)
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else {
-  val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  val responseBuilder = new OffsetCommitResponse.Builder()
+  val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+if (!authorizedTopics.contains(topic.name)) {
+  // If the topic is not authorized, we add the topic and all its 
partitions
+  // to the response with TOPIC_AUTHORIZATION_FAILED.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+} else if (!metadataCache.contains(topic.name)) {
+  // If the topic is unknown, we add the topic and all its partitions
+  // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+} else {
+  // Otherwise, we check all partitions to ensure that they all 

[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822697


##
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.utils;
 
-package kafka.common
+import org.junit.jupiter.api.Test;
 
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
-val recordErrors: Seq[RecordError])
-  extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+@Test
+public void testLongRef() {

Review Comment:
   `IntRef` was there before, but yes, I can take the chance to improve 
coverage.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 config.messageTimestampDifferenceMaxMs,
 leaderEpoch,
 origin,
-interBrokerProtocolVersion,
-brokerTopicStats,
+interBrokerProtocolVersion
+  )
+  validator.validateMessagesAndAssignOffsets(offset,
+validatorMetricsRecorder(brokerTopicStats.allTopicsStats),

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


ijuma commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054822187


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, 
baseOffset: Long): LogSegment = {
 LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): 
LogValidator.MetricsRecorder = {
+new LogValidator.MetricsRecorder {
+  def recordInvalidMagic(): Unit =
+allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+  def recordInvalidOffset(): Unit =

Review Comment:
   It seemed a bit arbitrary that we combined these into the same metric. I 
thought it may be cleaner to define the interface in a general way and then 
have the implementation match the current behavior. I don't feel too strongly 
though, so if you still prefer to combine these into the same method, I can do 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054819922


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
* Handle an offset commit request
*/
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
+  def handleOffsetCommitRequest(
+request: RequestChannel.Request,
+requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
-  }
-}
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
-}
-
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-}
-  }
-  sendResponseCallback(errorMap.toMap)
+  requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+  CompletableFuture.completedFuture[Unit](())
 } else {
-  val authorizedTopicRequestInfoBldr = 
immutable.Map.newBuilder[TopicPartition, 
OffsetCommitRequestData.OffsetCommitRequestPartition]
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  val responseBuilder = new OffsetCommitResponse.Builder()
+  val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+if (!authorizedTopics.contains(topic.name)) {
+  // If the topic is not authorized, we add the topic and all its 
partitions
+  // to the response with TOPIC_AUTHORIZATION_FAILED.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+} else if (!metadataCache.contains(topic.name)) {
+  // If the topic is unknown, we add the topic and all its partitions
+  // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+  
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+} else {
+  // Otherwise, we check all partitions to ensure that they all 

[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
 }
 CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+context: RequestContext,
+request: OffsetCommitRequestData,
+bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+val future = new CompletableFuture[OffsetCommitResponseData]()
+
+def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+  val response = new OffsetCommitResponseData()
+  val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  commitStatus.forKeyValue { (tp, error) =>
+var topic = byTopics(tp.topic)
+if (topic == null) {
+  topic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+  byTopics += tp.topic -> topic
+  response.topics.add(topic)
+}
+topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  }
+
+  future.complete(response)
+}
+
+// "default" expiration timestamp is now + retention (and retention may be 
overridden if v2)
+// expire timestamp is computed differently for v1 and v2.
+//   - If v1 and no explicit commit timestamp is provided we treat it the 
same as v5.
+//   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that

Review Comment:
   This comment is a little confusing. 
   So it seems like I understand v1 semantics -- we use the commit timestamp if 
provided for "now". 
   
   For v2 and beyond, I'm a big confused about the last two bullets. It makes 
it seem like there is no difference between v2-v4 and v5+, but I think the 
difference is that the retention can no longer be overridden in v5+. That part 
is unclear in the last bullet as it says "partition expiration" but 
"RetentionTimeMs" is the field name.
   
   This is my understanding based on the code
   
   ```
   version:  (can define commit time aka "now"), (can define retention time)
  1 yesno
  2 no yes
  3 no yes
  4 no yes
 5+ no no
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
 }
 CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+context: RequestContext,
+request: OffsetCommitRequestData,
+bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+val future = new CompletableFuture[OffsetCommitResponseData]()
+
+def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+  val response = new OffsetCommitResponseData()
+  val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  commitStatus.forKeyValue { (tp, error) =>
+var topic = byTopics(tp.topic)
+if (topic == null) {
+  topic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+  byTopics += tp.topic -> topic
+  response.topics.add(topic)
+}
+topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(tp.partition)
+  .setErrorCode(error.code))
+  }
+
+  future.complete(response)
+}
+
+// "default" expiration timestamp is now + retention (and retention may be 
overridden if v2)
+// expire timestamp is computed differently for v1 and v2.
+//   - If v1 and no explicit commit timestamp is provided we treat it the 
same as v5.
+//   - If v1 and explicit retention time is provided we calculate 
expiration timestamp based on that

Review Comment:
   This comment is a little confusing. 
   So it seems like I understand v1 semantics -- we use the commit timestamp if 
provided for "now". 
   
   For v2 and beyond, I'm a big confused about the last two bullets. It makes 
it seem like there is no difference between v2-v4 and v5+, but I think the 
difference is that the retention can no longer be overridden in v5+. That part 
is unclear in the last bullet as it says "partition expiration" but "partition 
retention" is the field name.
   
   This is my understanding based on the code
   
   ```
   version:  (can define commit time aka "now"), (can define retention time)
  1 yesno
  2 no yes
  3 no yes
  4 no yes
 5+ no no
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054793572


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   I need to look into other APIs to see how we usually handle this. At a first 
glance, it seems that we are not consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13035: KAFKA-9087 The changed future log causes that ReplicaAlterLogDirsThre…

2022-12-21 Thread GitBox


chia7712 opened a new pull request, #13035:
URL: https://github.com/apache/kafka/pull/13035

   We encountered this error also. The root cause is about race condition.
   1. ReplicaAlterLogDirsThread has fetched the data for topic partition
   1. ReplicaManager#alterReplicaLogDirs changes the future log (the start 
offset is reset to 0)
   1. ReplicaManager#alterReplicaLogDirs call 
AbstractFetcherManager#addFetcherForPartitions to add the topic partition (it 
just change the partition state in the ReplicaAlterLogDirsThread)
   1. ReplicaAlterLogDirsThread starts to process the fetched data, and it 
throws IllegalStateException because the future log get renewed and start 
offset is zero.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054782481


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest

Review Comment:
   ```
   val groups = Map(
   "group-1" -> List(
 new TopicPartition("foo", 0),
 new TopicPartition("foo", 1)
   ).asJava,
   "group-2" -> null,
   "group-3" -> null,
 ).asJava
   ```
   
   with `null`, it fetch all offsets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


cmccabe commented on PR #13033:
URL: https://github.com/apache/kafka/pull/13033#issuecomment-1362010708

   @mumrah : you're right that we need to be checking MV before generating 
these records.
   
   On second thought, I also think we should be using abort transation rather 
than checking record epochs to see if a transaction was aborted -- it will just 
make it easier for tools to process the log. I will make that change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1362010257

   @jolshan @OmniaGM Thanks for your comments. I just updated the PR. I have 
tried to simplify the code in KafkaApis as much as possible. Let me know what 
you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054778967


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset commit request
*/
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
+def sendResponse(response: OffsetCommitResponse): Unit = {
+  trace(s"Sending offset commit response $response for correlation id 
${request.header.correlationId} to " +
+s"client ${request.header.clientId}.")
+
+  if (isDebugEnabled) {
+response.data.topics.forEach { topic =>
+  topic.partitions.forEach { partition =>
+if (partition.errorCode != Errors.NONE.code) {
+  debug(s"Offset commit request with correlation id 
${request.header.correlationId} from client ${request.header.clientId} " +
+s"on partition ${topic.name}-${partition.partitionIndex} 
failed due to ${Errors.forCode(partition.errorCode)}")
+}
   }
 }
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
+  }
+
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+} else {
+  val offsetCommitResponseData = new OffsetCommitResponseData()
+  val topicsPendingPartitions = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  def makePartitionResponse(
+partitionIndex: Int,
+error: Errors
+  ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+new OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(partitionIndex)
+  .setErrorCode(error.code)
+  }
+
+  def addTopicToResponse(
+topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+error: Errors
+  ): Unit = {
+val topicResponse = new 

[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054778185


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054777972


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1071,7 +1136,11 @@ public ControllerResult generateRecordsAndResult() 
throws Exception {
 log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
 "at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
 bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("initial 
transaction"), (short) 0));
 records.addAll(bootstrapMetadata.records());
+records.add(new ApiMessageAndVersion(
+new EndTransactionRecord(), (short) 0));

Review Comment:
   it's not needed but I wanted to get some testing on this. thinking more, we 
should only begin a transaction here when we're doing upgrade-from-zk and in 
premigration (but we need to have that code in first...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054777011


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -76,6 +77,21 @@ class BrokerMetadataListener(
*/
   private var _highestTimestamp = -1L
 
+  /**
+   * The log epoch of the current transaction, or -1 if there is none.
+   */
+  private var _transactionEpoch = -1;
+
+  /**
+   * The log offset of the current transaction, or -1L if there is none.
+   */
+  private var _transactionOffset = -1L;
+
+  /**
+   * Pending records in the current transaction, or null if there is none such.
+   */
+  private var _transactionRecords: util.ArrayList[ApiMessageAndVersion] = _
+

Review Comment:
   yeah, this file will be removed. but that's too ambitious for 3.4 I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054776436


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -860,7 +886,7 @@ public void handleCommit(BatchReader 
reader) {
 int i = 1;
 for (ApiMessageAndVersion message : messages) {
 try {
-replay(message.message(), 
Optional.empty(), offset);
+replay(message.message(), 
Optional.empty(), offset + i - 1, epoch);

Review Comment:
   previously it was sending the end offset of the batch. it now sends the 
actual record offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054775292


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -289,6 +316,59 @@ class BrokerMetadataListener(
 BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), 
numBytes)
   }
 
+  private def applyRecordToDelta(
+delta: MetadataDelta,
+messageAndVersion: ApiMessageAndVersion,
+baseOffset: Long,
+index: Int,
+snapshotName: Option[String]
+  ): Unit = {
+try {
+  delta.replay(messageAndVersion.message())
+} catch {
+  case e: Throwable => snapshotName match {
+case None => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying metadata log record at offset ${baseOffset + 
index}", e)
+case Some(name) => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying record ${index} from snapshot ${name} at offset 
${_highestOffset}", e)
+  }
+}
+  }
+
+  private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: 
Long): Unit = {
+if (_transactionEpoch != -1) {
+  abortTransaction("a new begin transaction record appeared", 
newTransactionOffset)
+}
+_transactionEpoch = newTransactionEpoch
+_transactionOffset = newTransactionOffset
+_transactionRecords = new util.ArrayList[ApiMessageAndVersion]
+log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, 
_transactionEpoch)
+  }
+
+  private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = {
+log.debug("Ending metadata transaction {}_{} at offset {}", 
_transactionOffset, _transactionEpoch, endOffset)
+var index = 0
+_transactionRecords.forEach(record => {

Review Comment:
   yes, I will check if there is a transaction and raise a fault if so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13029:
URL: https://github.com/apache/kafka/pull/13029#discussion_r1054764792


##
clients/src/main/resources/common/message/ApiVersionsResponse.json:
##
@@ -69,6 +69,9 @@
 {"name":  "MinVersionLevel", "type": "int16", "versions":  "3+",
   "about": "The cluster-wide finalized min version level for the 
feature."}
   ]
-}
+},
+{ "name":  "ZkMigrationReady", "type": "bool", "versions": "4+", 
"taggedVersions": "4+",

Review Comment:
   version should be 3, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13001: KAFKA-14446: code style improvements for broker-to-controller forwarding

2022-12-21 Thread GitBox


cmccabe merged PR #13001:
URL: https://github.com/apache/kafka/pull/13001


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2022-12-21 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650979#comment-17650979
 ] 

Chia-Ping Tsai commented on KAFKA-9087:
---

We encountered this error also. The root cause is about race condition.
 # ReplicaAlterLogDirsThread has fetched the data for topic partition
 # ReplicaManager#alterReplicaLogDirs changes the future log (the start offset 
is reset to 0)
 # ReplicaManager#alterReplicaLogDirs call 
AbstractFetcherManager#addFetcherForPartitions to add the topic partition (it 
just change the partition state in the ReplicaAlterLogDirsThread)
 # ReplicaAlterLogDirsThread starts to process the fetched data, and it throws 
IllegalStateException because the future log get renewed and start offset is 
zero.

This bug causes the future log of topic partition can't get synced forever as 
the topic partition is marked as failed. It seems to me that we should return 
None instead of throwing IllegalStateException when start offset of future log 
is zero. [~junrao] WDYT? 

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 segments, log start 
> offset 

[GitHub] [kafka] C0urante commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2022-12-21 Thread GitBox


C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1054700417


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1658,11 +1659,20 @@ private void backoff(long ms) {
 backoffRetries = BACKOFF_RETRIES;
 }
 
-private void startAndStop(Collection> callables) {
+// Visible for testing
+void startAndStop(Collection> callables) {
 try {
 startAndStopExecutor.invokeAll(callables);
 } catch (InterruptedException e) {
 // ignore
+} catch (RejectedExecutionException e) {
+// Shutting down. Just log the exception
+if (stopping.get()) {
+log.debug("RejectedExecutionException thrown while herder is 
shutting down. This could be " +
+"because startAndStopExecutor is either already 
shutdown or is full.");

Review Comment:
   The [Executors::newFixedThreadPool 
Javadocs](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-java.util.concurrent.ThreadFactory-)
 state that the executor operates off of an "unbounded queue", so it's 
misleading to state that this error could arise because the executor is full.
   
   It would also probably help to be clear to users that this is not a sign 
that something's wrong with the worker.
   
   ```suggestion
   log.debug("Ignoring RejectedExecutionException thrown while 
starting/stopping connectors/tasks en masse " +
   "as the herder is already in the process of shutting 
down. This is not indicative of a problem and is normal behavior");
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -271,6 +275,9 @@ public DistributedHerder(DistributedConfig config,
 this.workerGroupId = 
config.getString(DistributedConfig.GROUP_ID_CONFIG);
 this.workerSyncTimeoutMs = 
config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
 this.workerTasksShutdownTimeoutMs = 
config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+// Timeout for herderExecutor to gracefully terminate is set to a 
value to accommodate
+// reading to the end of the config topic + successfully attempting to 
stop all connectors and tasks and a buffer of 10s
+this.herderExecutorTerminationTimeoutMs = this.workerSyncTimeoutMs + 
this.workerTasksShutdownTimeoutMs + 
Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS + 1;

Review Comment:
   Do these have to be stored as instance variables? It clutters up the class a 
bit to keep having so many private fields that are only used once.
   
   Maybe we could remove the `workerTasksShutdownTimeoutMs` and 
`herderExecutorTerminationTimeoutMs` fields, and inline the logic responsible 
for computing them and move it into `stop`?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3629,6 +3630,64 @@ public void testPollDurationOnSlowConnectorOperations() {
 PowerMock.verifyAll();
 }
 
+@Test(expected = RejectedExecutionException.class)
+@SuppressWarnings("unchecked")
+public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping()
 throws InterruptedException {
+ExecutorService startAndStopExecutor = 
EasyMock.mock(ExecutorService.class);
+herder.startAndStopExecutor = startAndStopExecutor;
+
+Callable connectorStartingCallable = () -> null;
+
+
EasyMock.expect(startAndStopExecutor.invokeAll(EasyMock.anyObject(Collection.class))).andThrow(new
 RejectedExecutionException());
+
+PowerMock.replayAll(startAndStopExecutor);
+
+
herder.startAndStop(Collections.singletonList(connectorStartingCallable));
+
+}
+
+@Test
+public void 
shouldHaltCleanlyWhenHerderStartsAndStopsAndConfigTopicReadTimesOut() throws 
TimeoutException {
+connectProtocolVersion = CONNECT_PROTOCOL_V1;
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+final int rebalanceDelayMs = 2;
+
+// Assign the connector to this worker, and have it start
+expectRebalance(Collections.emptyList(), Collections.emptyList(), 
ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), 
Collections.emptyList(), rebalanceDelayMs);
+
+member.wakeup();
+PowerMock.expectLastCall();
+member.requestRejoin();
+PowerMock.expectLastCall();
+member.maybeLeaveGroup(anyString());
+PowerMock.expectLastCall();
+worker.stopAndAwaitConnectors();
+PowerMock.expectLastCall();
+worker.stopAndAwaitTasks();
+

[GitHub] [kafka] junrao commented on a diff in pull request #13012: KAFKA-14477: Move LogValidator and related to storage module

2022-12-21 Thread GitBox


junrao commented on code in PR #13012:
URL: https://github.com/apache/kafka/pull/13012#discussion_r1054705599


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2185,6 +2188,27 @@ object UnifiedLog extends Logging {
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, 
baseOffset: Long): LogSegment = {
 LocalLog.createNewCleanedSegment(dir, logConfig, baseOffset)
   }
+
+  // Visible for benchmarking
+  def validatorMetricsRecorder(allTopicsStats: BrokerTopicMetrics): 
LogValidator.MetricsRecorder = {
+new LogValidator.MetricsRecorder {
+  def recordInvalidMagic(): Unit =
+allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
+
+  def recordInvalidOffset(): Unit =

Review Comment:
   recordInvalidOffset() and recordInvalidSequence() do the same thing. Should 
we just have a single method?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -846,21 +842,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 config.messageTimestampDifferenceMaxMs,
 leaderEpoch,
 origin,
-interBrokerProtocolVersion,
-brokerTopicStats,
+interBrokerProtocolVersion
+  )
+  validator.validateMessagesAndAssignOffsets(offset,
+validatorMetricsRecorder(brokerTopicStats.allTopicsStats),

Review Comment:
   Could we just create a single instance of MetricsRecorder and reuse it for 
all appends?



##
clients/src/test/java/org/apache/kafka/common/utils/PrimitiveRefTest.java:
##
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.utils;
 
-package kafka.common
+import org.junit.jupiter.api.Test;
 
-import org.apache.kafka.common.errors.ApiException
-import org.apache.kafka.common.requests.ProduceResponse.RecordError
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import scala.collection.Seq
-
-class RecordValidationException(val invalidException: ApiException,
-val recordErrors: Seq[RecordError])
-  extends RuntimeException(invalidException) {
+public class PrimitiveRefTest {
+@Test
+public void testLongRef() {

Review Comment:
   Should we add a similar test for IntRef() too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #12979: KAFKA-14544 The "is-future" should be removed from metrics tags after future log becomes current log

2022-12-21 Thread GitBox


chia7712 commented on PR #12979:
URL: https://github.com/apache/kafka/pull/12979#issuecomment-1361909104

   file a jira as this is related to true bug


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14544) The "is-future" should be removed from metrics tags after future log becomes current log

2022-12-21 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-14544:
--

 Summary: The "is-future" should be removed from metrics tags after 
future log becomes current log
 Key: KAFKA-14544
 URL: https://issues.apache.org/jira/browse/KAFKA-14544
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


we don't remove "is-future=true" tag from future log after the future log 
becomes "current" log. It causes two potential issues:
 # the metrics monitors can't get metrics of Log if they don't trace the 
property "is-future=true".
 # all Log metrics of specify partition get removed if the partition is moved 
to another folder again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #12936: KAFKA-13881: Add Streams package infos

2022-12-21 Thread GitBox


mimaison commented on PR #12936:
URL: https://github.com/apache/kafka/pull/12936#issuecomment-1361845545

   @mjsax @ableegoldman Can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13881) Add package.java for public package javadoc

2022-12-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650968#comment-17650968
 ] 

Mickael Maison commented on KAFKA-13881:


[~gharris1727] Are you planning to open a PR for the 
org.apache.kafka.server.log.remote.storage located in the storage:api module?
See the javadoc for this package: 
https://kafka.apache.org/33/javadoc/org/apache/kafka/server/log/remote/storage/package-frame.html

> Add package.java for public package javadoc
> ---
>
> Key: KAFKA-13881
> URL: https://issues.apache.org/jira/browse/KAFKA-13881
> Project: Kafka
>  Issue Type: Task
>Reporter: Tom Bentley
>Assignee: Greg Harris
>Priority: Trivial
> Fix For: 3.5.0
>
>
> Our public javadoc ([https://kafka.apache.org/31/javadoc/index.html)] doesn't 
> have any package descriptions, which is a bit intimidating for anyone who is 
> new to the project.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #13034: MINOR: remove onChange call in stream assignor assign method

2022-12-21 Thread GitBox


lihaosky opened a new pull request, #13034:
URL: https://github.com/apache/kafka/pull/13034

   ## Description
   Remove unnecessary calls in assign method
   
   ## Test
   Existing tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13467) Clients never refresh cached bootstrap IPs

2022-12-21 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650964#comment-17650964
 ] 

Ivan Yurchenko commented on KAFKA-13467:


Probably related https://issues.apache.org/jira/browse/KAFKA-8206

> Clients never refresh cached bootstrap IPs
> --
>
> Key: KAFKA-13467
> URL: https://issues.apache.org/jira/browse/KAFKA-13467
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, network
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Follow up ticket to https://issues.apache.org/jira/browse/KAFKA-13405.
> For certain broker rolling upgrade scenarios, it would be beneficial to 
> expired cached bootstrap server IP addresses and re-resolve those IPs to 
> allow clients to re-connect to the cluster without the need to restart the 
> client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #12947: KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use shared logic

2022-12-21 Thread GitBox


C0urante merged PR #12947:
URL: https://github.com/apache/kafka/pull/12947


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14543) Move LogOffsetMetadata to storage module

2022-12-21 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14543:
--

 Summary: Move LogOffsetMetadata to storage module
 Key: KAFKA-14543
 URL: https://issues.apache.org/jira/browse/KAFKA-14543
 Project: Kafka
  Issue Type: Sub-task
Reporter: Mickael Maison
Assignee: Mickael Maison






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054648954


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset commit request
*/
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
+def sendResponse(response: OffsetCommitResponse): Unit = {
+  trace(s"Sending offset commit response $response for correlation id 
${request.header.correlationId} to " +
+s"client ${request.header.clientId}.")
+
+  if (isDebugEnabled) {
+response.data.topics.forEach { topic =>
+  topic.partitions.forEach { partition =>
+if (partition.errorCode != Errors.NONE.code) {
+  debug(s"Offset commit request with correlation id 
${request.header.correlationId} from client ${request.header.clientId} " +
+s"on partition ${topic.name}-${partition.partitionIndex} 
failed due to ${Errors.forCode(partition.errorCode)}")
+}
   }
 }
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
+  }
+
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+} else {
+  val offsetCommitResponseData = new OffsetCommitResponseData()
+  val topicsPendingPartitions = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  def makePartitionResponse(
+partitionIndex: Int,
+error: Errors
+  ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+new OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(partitionIndex)
+  .setErrorCode(error.code)
+  }
+
+  def addTopicToResponse(
+topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+error: Errors
+  ): Unit = {
+val topicResponse = new 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13029: MINOR: Add zk migration field to the ApiVersionsResponse

2022-12-21 Thread GitBox


cmccabe commented on code in PR #13029:
URL: https://github.com/apache/kafka/pull/13029#discussion_r1054647289


##
clients/src/main/resources/common/message/ApiVersionsResponse.json:
##
@@ -25,9 +25,12 @@
   // not in the header. The length of the header must not change in order to 
guarantee the
   // backward compatibility.
   //
+  // Version 4 adds zk to kraft migration field to the response used to 
determine if the controller
+  // is ready for migration.
+  //
   // Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with 
the supported
   // versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is 
returned.
-  "validVersions": "0-3",
+  "validVersions": "0-4",

Review Comment:
   It's not necessary to increment the version when adding a tagged field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054640668


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   Are we deciding to actually throw the error? If so we should document that. 
   
   Otherwise, maybe we can add a comment in the request file that requesting 
the group multiple times in the request will also give us the response multiple 
times as of this release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054638304


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest

Review Comment:
   Do we have a test here for the fetchAllOffsets path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14470) Move log layer to storage module

2022-12-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650947#comment-17650947
 ] 

Ismael Juma commented on KAFKA-14470:
-

[~mimaison] Dependencies of the log layer classes would have to be moved too, 
yes. In some cases, it may warrant a separate ticket while in others it can be 
bundled with one of the tickets that already exists (left to the discretion of 
the contributor).

Regarding kraft, yes there are some opportunities once the log layer has been 
completed moved to the storage layer. I was having a discussion with Jose 
yesterday about this and will file a ticket to capture it.

> Move log layer to storage module
> 
>
> Key: KAFKA-14470
> URL: https://issues.apache.org/jira/browse/KAFKA-14470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We introduced the `storage` module as part of KIP-405, but the existing log 
> layer remains in the `core` module. Moving the log layer to the `storage` 
> module would be another step towards improved modularity and build times 
> (similar to `metadata`, `raft` and `group-coordinator`).
> We should do this in an incremental manner to make the code review process 
> easier. I will create separate tasks, each one mapping to one pull request. 
> In order to understand the feasibility, I tackled a few of the tasks myself.
> Help from the community is appreciated for the unassigned tasks, but it 
> probably makes sense to do that after the initial PRs have been submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054636234


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
   offsetFetchResponse
 }
 requestHelper.sendResponseMaybeThrottle(request, createResponse)
+CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {

Review Comment:
   Cool. Thanks for simplifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


jolshan commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054632844


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##
@@ -19,6 +19,7 @@
 import java.util.Map.Entry;

Review Comment:
   sounds like a good plan. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13933) Stuck SSL/TLS unit tests in case of authentication failure

2022-12-21 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-13933:

Description: 
When there is an authentication error after the initial TCP connection, the 
Selector never becomes READY, and its SSL/TLS tests wait forever for readiness.

This is actually what happened to me while running SSL/TLS Selector unit tests 
using an OpenJDK build that does not support the required cipher suites.

  was:
When there is an authentication error after the initial TCP connection, the 
Selector never becomes READY, and its SSL/TLS tests wait forever waiting for 
readiness.

This is actually what happened to me while running SSL/TLS Selector unit tests 
using an OpenJDK build that does not support the required cipher suites.


> Stuck SSL/TLS unit tests in case of authentication failure
> --
>
> Key: KAFKA-13933
> URL: https://issues.apache.org/jira/browse/KAFKA-13933
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.2.0
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Minor
> Fix For: 3.3.0
>
>
> When there is an authentication error after the initial TCP connection, the 
> Selector never becomes READY, and its SSL/TLS tests wait forever for 
> readiness.
> This is actually what happened to me while running SSL/TLS Selector unit 
> tests using an OpenJDK build that does not support the required cipher suites.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer

2022-12-21 Thread GitBox


michaeljmarshall commented on code in PR #13032:
URL: https://github.com/apache/kafka/pull/13032#discussion_r1054618672


##
clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+public class DataOutputStreamWritableTest {
+@Test
+public void testWritingSlicedByteBuffer() {
+ByteBuffer expectedBuffer = ByteBuffer.wrap(new byte[]{2, 3});
+ByteBuffer originalBuffer = ByteBuffer.allocate(4);
+for (int i = 0; i < originalBuffer.limit(); i++) {
+originalBuffer.put(i, (byte) i);
+}
+// Move position forward to ensure slice is not whole buffer
+originalBuffer.position(2);
+ByteBuffer slicedBuffer = originalBuffer.slice();
+

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-12-21 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14132:
--
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij) *WIP* 
 # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#ff8b00}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#ff8b00}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij) *WIP* 
 # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij) *WIP* 
>  # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # 

[GitHub] [kafka] C0urante merged pull request #12735: KAFKA-14132: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest

2022-12-21 Thread GitBox


C0urante merged PR #12735:
URL: https://github.com/apache/kafka/pull/12735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12735: KAFKA-14132: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest

2022-12-21 Thread GitBox


C0urante commented on PR #12735:
URL: https://github.com/apache/kafka/pull/12735#issuecomment-1361648543

   FYI, I've changed the Jira ticket in the title from KAFKA-14059 (which 
refers to a different test class) to KAFKA-14132, which is the umbrella issue 
for migration from EasyMock/PowerMock to Mockito and mentions this test case 
explicitly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer

2022-12-21 Thread GitBox


michaeljmarshall commented on code in PR #13032:
URL: https://github.com/apache/kafka/pull/13032#discussion_r1054597086


##
clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+public class DataOutputStreamWritableTest {
+@Test
+public void testWritingSlicedByteBuffer() {
+ByteBuffer expectedBuffer = ByteBuffer.wrap(new byte[]{2, 3});
+ByteBuffer originalBuffer = ByteBuffer.allocate(4);
+for (int i = 0; i < originalBuffer.limit(); i++) {
+originalBuffer.put(i, (byte) i);
+}
+// Move position forward to ensure slice is not whole buffer
+originalBuffer.position(2);
+ByteBuffer slicedBuffer = originalBuffer.slice();
+

Review Comment:
   You are right. You provided the solution in the other comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054592382


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset commit request
*/
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
-val header = request.header
 val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-// the callback for sending an offset commit response
-def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit 
= {
-  val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ 
nonExistingTopicErrors
-  if (isDebugEnabled)
-combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-  if (error != Errors.NONE) {
-debug(s"Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
-  s"on partition $topicPartition failed due to 
${error.exceptionName}")
+def sendResponse(response: OffsetCommitResponse): Unit = {
+  trace(s"Sending offset commit response $response for correlation id 
${request.header.correlationId} to " +
+s"client ${request.header.clientId}.")
+
+  if (isDebugEnabled) {
+response.data.topics.forEach { topic =>
+  topic.partitions.forEach { partition =>
+if (partition.errorCode != Errors.NONE.code) {
+  debug(s"Offset commit request with correlation id 
${request.header.correlationId} from client ${request.header.clientId} " +
+s"on partition ${topic.name}-${partition.partitionIndex} 
failed due to ${Errors.forCode(partition.errorCode)}")
+}
   }
 }
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-new OffsetCommitResponse(requestThrottleMs, 
combinedCommitStatus.asJava))
+  }
+
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+response.maybeSetThrottleTimeMs(requestThrottleMs)
+response
+  })
 }
 
-  // reject the request if not authorized to the group
+// Reject the request if not authorized to the group
 if (!authHelper.authorize(request.context, READ, GROUP, 
offsetCommitRequest.data.groupId)) {
-  val error = Errors.GROUP_AUTHORIZATION_FAILED
-  val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-offsetCommitRequest.data.topics,
-error)
-
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
new OffsetCommitResponse(
-new OffsetCommitResponseData()
-.setTopics(responseTopicList)
-.setThrottleTimeMs(requestThrottleMs)
-  ))
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
 } else if (offsetCommitRequest.data.groupInstanceId != null && 
config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
   // Only enable static membership when IBP >= 2.3, because it is not safe 
for the broker to use the static member logic
   // until we are sure that all brokers support it. If static group being 
loaded by an older coordinator, it will discard
   // the group.instance.id field, so static members could accidentally 
become "dynamic", which leads to wrong states.
-  val errorMap = new mutable.HashMap[TopicPartition, Errors]
-  for (topicData <- offsetCommitRequest.data.topics.asScala) {
-for (partitionData <- topicData.partitions.asScala) {
-  val topicPartition = new TopicPartition(topicData.name, 
partitionData.partitionIndex)
-  errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+  
sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+} else {
+  val offsetCommitResponseData = new OffsetCommitResponseData()
+  val topicsPendingPartitions = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+  val authorizedTopics = authHelper.filterByAuthorized(
+request.context,
+READ,
+TOPIC,
+offsetCommitRequest.data.topics.asScala
+  )(_.name)
+
+  def makePartitionResponse(
+partitionIndex: Int,
+error: Errors
+  ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+new OffsetCommitResponseData.OffsetCommitResponsePartition()
+  .setPartitionIndex(partitionIndex)
+  .setErrorCode(error.code)
+  }
+
+  def addTopicToResponse(
+topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+error: Errors
+  ): Unit = {
+val topicResponse = new 

[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054582822


##
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java:
##
@@ -178,6 +179,7 @@ public KafkaApis build() {
  metadataSupport,
  replicaManager,
  groupCoordinator,
+ new GroupCoordinatorAdapter(groupCoordinator, 
time),

Review Comment:
   GroupCoordinatorAdapter is an internal change so we don't have to expose it 
in the builder. It only wraps the GroupCoordinator passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] michaeljmarshall commented on a diff in pull request #13032: KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer

2022-12-21 Thread GitBox


michaeljmarshall commented on code in PR #13032:
URL: https://github.com/apache/kafka/pull/13032#discussion_r1054547725


##
clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java:
##
@@ -99,7 +99,7 @@ public void writeUnsignedVarint(int i) {
 public void writeByteBuffer(ByteBuffer buf) {
 try {
 if (buf.hasArray()) {
-out.write(buf.array(), buf.position(), buf.limit());
+out.write(buf.array(), buf.arrayOffset(), buf.limit());

Review Comment:
   Great catch, thank you for your reviews. I'll fix the implementation and 
make sure the missed cases are covered by tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14470) Move log layer to storage module

2022-12-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650927#comment-17650927
 ] 

Mickael Maison commented on KAFKA-14470:


Should we also move classes like LogOffsetMetadata? It's in kafka.server but 
it's used quite a bit in LocalLog, UnifiedLog, LogSegment.

There's also a similar LogOffsetMetadata class in the raft module. I'm not very 
familiar with the raft copy but both seem to represent pretty much the same 
thing so we could potentially merge them.

> Move log layer to storage module
> 
>
> Key: KAFKA-14470
> URL: https://issues.apache.org/jira/browse/KAFKA-14470
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We introduced the `storage` module as part of KIP-405, but the existing log 
> layer remains in the `core` module. Moving the log layer to the `storage` 
> module would be another step towards improved modularity and build times 
> (similar to `metadata`, `raft` and `group-coordinator`).
> We should do this in an incremental manner to make the code review process 
> easier. I will create separate tasks, each one mapping to one pull request. 
> In order to understand the feasibility, I tackled a few of the tasks myself.
> Help from the community is appreciated for the unassigned tasks, but it 
> probably makes sense to do that after the initial PRs have been submitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14542) Deprecate OffsetFetch/Commit version 0 and remove them in 4.0

2022-12-21 Thread David Jacot (Jira)
David Jacot created KAFKA-14542:
---

 Summary: Deprecate OffsetFetch/Commit version 0 and remove them in 
4.0
 Key: KAFKA-14542
 URL: https://issues.apache.org/jira/browse/KAFKA-14542
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


We should deprecate OffsetFetch/Commit APIs and remove them in AK 4.0. Those 
two APIs are used by old clients to write offsets to and read offsets from ZK.

We need a small KIP for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054496553


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   In my opinion, it is pretty bad to just ignore one group passed in the 
request like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14541) Profile produce workload for Apache Kafka

2022-12-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14541:
-
Description: 
I have been profiling Kafka (3.4.0 / trunk right now) for a produce only 
workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] 
workloads. The goal is to get a better understanding of CPU usage profile for 
Kafka and eliminate potential overheads to reduce CPU consumption.
h2. *Setup*

R6i.16xl (64 cores)
OS: Amazon Linux 2

Single broker, One topic, One partition

Plaintext

Prometheus Java agent attached

 
{code:java}
-XX:+PreserveFramePointer
-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints{code}
 

 
{code:java}
queued.max.requests=1
num.network.threads=32
num.io.threads=128
socket.request.max.bytes=104857600{code}
 
h3. Producer setup:
{code:java}
batch.size=9000
buffer.memory=33554432
enable.idempotence=false
linger.ms=0
receive.buffer.bytes=-1
send.buffer.bytes=-1
max.in.flight.requests.per.connection=10{code}
 
h3. Profiler setup:

[async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this 
profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from 
safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" 
mode (wall clock time) or "cycles" mode (used for better kernel call stack)
h2. Observations
 # Processor.run > Processor#updateRequestMetrics() is a very expensive call. 
We should revisit whether we want to pre-compute histograms or not. Maybe 
upgrading to latest dropwizard will improve this? (JIRA: 
https://issues.apache.org/jira/browse/KAFKA-14423)
 # (Processor > Selector.poll), (Processor > Selector.write) and many other 
places  - Accumulative cost of Sensor#recordInternal is high. 
 # Processor threads are consuming more CPU than Handler threads?!! Perhaps 
because handler threads spend a lot of time waiting for partition lock at 
UnifiedLock.scala
 # HandleProduceRequest > RequestSizeInBytes - Unnecessary call to calculate 
size in bytes here. Low hanging opportunity to improve CPU utilisation for a 
request heavy workload. (Fixed in 
https://issues.apache.org/jira/browse/KAFKA-14414)
 # UnifiedLog#append > HeapByteBuffer.duplicate() - Why do we duplicate the 
buffer here? Ideally we shouldn't be making copies of buffer during the produce 
workflow. We should be using the same buffer after reading from socket to 
writing in a file.
 # Processor > Selector.select - Why is epoll consuming CPU cycles? It should 
have the thread in a timed_waiting state and hence, shouldn't consume CPU at 
all.
 # In a produce workload writing to socket is more CPU intensive than reading 
from the socket. This is surprising because reading would read more data from 
the socket (produce records) whereas writing would only write the response back 
which doesn't contain record data.
 # RequestChannel#sendResponse > wakeup - This is the call which wakes up the 
selector by writing to a file descriptor. Why is this so expensive?
 # The bottleneck in throughout is thread contention at the UnifiedLog.scala 
where handler thread wait for 80-90% of the time trying to acquire a lock. This 
observation was recorded from separate profiling. Can we remove this bottleneck 
by using fine grained lock for highwatermark update and log append as the 
current coarse grained lock causes contention?

I am still analysing the flamegraph (cpu mode attached here). Please feel free 
to comment on any of the observations or add your own observations here.

  was:
I have been profiling Kafka (3.4.0 / trunk right now) for a produce only 
workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] 
workloads. The goal is to get a better understanding of CPU usage profile for 
Kafka and eliminate potential overheads to reduce CPU consumption.
h2. *Setup*

R6i.16xl (64 cores)
OS: Amazon Linux 2

Single broker, One topic, One partition

Plaintext

Prometheus Java agent attached

 
{code:java}
-XX:+PreserveFramePointer
-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints{code}
 

 
{code:java}
queued.max.requests=1
num.network.threads=32
num.io.threads=128
socket.request.max.bytes=104857600{code}
 
h3. Producer setup:
{code:java}
batch.size=9000
buffer.memory=33554432
enable.idempotence=false
linger.ms=0
receive.buffer.bytes=-1
send.buffer.bytes=-1
max.in.flight.requests.per.connection=10{code}
 
h3. Profiler setup:

[async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this 
profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from 
safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" 
mode (wall clock time) or "cycles" mode (used for better kernel call stack)
h2. Observations
 # Processor.run > Processor#updateRequestMetrics() is a very expensive call. 
We should revisit whether we want to pre-compute histograms or not. Maybe 
upgrading to 

[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054494916


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1397,77 +1397,172 @@ class KafkaApis(val requestChannel: RequestChannel,
 requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-val header = request.header
+  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): CompletableFuture[Unit] = {

Review Comment:
   I am not sure. We had a lot of complex logic before. I have tried to 
simplify as much as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054494189


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1338,11 +1337,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
* Handle an offset fetch request
*/
-  def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetFetchRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
 val version = request.header.apiVersion
 if (version == 0) {
   // reading offsets from ZK
   handleOffsetFetchRequestV0(request)
+  CompletableFuture.completedFuture[Unit](())

Review Comment:
   It does not matter where it is. We just need it because this is what the 
method returns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054493387


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   I think that it is a quirk of the implementation. It is because we used 
HashMaps before this patch so the last one wins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054492204


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##
@@ -19,6 +19,7 @@
 import java.util.Map.Entry;

Review Comment:
   As I follow-up, I would like to clean this class. There are way too many 
ways to construct this object and the logic is pretty complicated. It is the 
same for the request. Let's do this separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054490900


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest

Review Comment:
   I added a few unit tests here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2022-12-21 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1054490432


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
   offsetFetchResponse
 }
 requestHelper.sendResponseMaybeThrottle(request, createResponse)
+CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {

Review Comment:
   I have simplified this code even further by pushing the handling of single 
vs multiple groups in the request/response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


mumrah commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054485418


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1071,7 +1136,11 @@ public ControllerResult generateRecordsAndResult() 
throws Exception {
 log.info("The metadata log appears to be empty. Appending {} 
bootstrap record(s) " +
 "at metadata.version {} from {}.", 
bootstrapMetadata.records().size(),
 bootstrapMetadata.metadataVersion(), 
bootstrapMetadata.source());
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("initial 
transaction"), (short) 0));
 records.addAll(bootstrapMetadata.records());
+records.add(new ApiMessageAndVersion(
+new EndTransactionRecord(), (short) 0));

Review Comment:
   Why do we need the bootstrap records wrapped in a transaction? Aren't they 
written as an atomic batch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


mumrah commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054455358


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -289,6 +316,59 @@ class BrokerMetadataListener(
 BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), 
numBytes)
   }
 
+  private def applyRecordToDelta(
+delta: MetadataDelta,
+messageAndVersion: ApiMessageAndVersion,
+baseOffset: Long,
+index: Int,
+snapshotName: Option[String]
+  ): Unit = {
+try {
+  delta.replay(messageAndVersion.message())
+} catch {
+  case e: Throwable => snapshotName match {
+case None => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying metadata log record at offset ${baseOffset + 
index}", e)
+case Some(name) => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying record ${index} from snapshot ${name} at offset 
${_highestOffset}", e)
+  }
+}
+  }
+
+  private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: 
Long): Unit = {
+if (_transactionEpoch != -1) {
+  abortTransaction("a new begin transaction record appeared", 
newTransactionOffset)
+}
+_transactionEpoch = newTransactionEpoch
+_transactionOffset = newTransactionOffset
+_transactionRecords = new util.ArrayList[ApiMessageAndVersion]
+log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, 
_transactionEpoch)
+  }
+
+  private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = {
+log.debug("Ending metadata transaction {}_{} at offset {}", 
_transactionOffset, _transactionEpoch, endOffset)
+var index = 0
+_transactionRecords.forEach(record => {

Review Comment:
   We should guard against a null `_transactionRecords` here. If two 
EndTransactionRecord-s appeared (somehow..) it would raise an NPE
   
   Same comment for this logic in MetadataLoader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


mumrah commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054483117


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -76,6 +77,21 @@ class BrokerMetadataListener(
*/
   private var _highestTimestamp = -1L
 
+  /**
+   * The log epoch of the current transaction, or -1 if there is none.
+   */
+  private var _transactionEpoch = -1;
+
+  /**
+   * The log offset of the current transaction, or -1L if there is none.
+   */
+  private var _transactionOffset = -1L;
+
+  /**
+   * Pending records in the current transaction, or null if there is none such.
+   */
+  private var _transactionRecords: util.ArrayList[ApiMessageAndVersion] = _
+

Review Comment:
   I'm guessing all of this logic will go away once we refactor the broker to 
use MetadataLoader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13033: KAFKA-14538: Implement KRaft metadata transactions

2022-12-21 Thread GitBox


mumrah commented on code in PR #13033:
URL: https://github.com/apache/kafka/pull/13033#discussion_r1054455358


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -289,6 +316,59 @@ class BrokerMetadataListener(
 BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), 
numBytes)
   }
 
+  private def applyRecordToDelta(
+delta: MetadataDelta,
+messageAndVersion: ApiMessageAndVersion,
+baseOffset: Long,
+index: Int,
+snapshotName: Option[String]
+  ): Unit = {
+try {
+  delta.replay(messageAndVersion.message())
+} catch {
+  case e: Throwable => snapshotName match {
+case None => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying metadata log record at offset ${baseOffset + 
index}", e)
+case Some(name) => metadataLoadingFaultHandler.handleFault(
+  s"Error replaying record ${index} from snapshot ${name} at offset 
${_highestOffset}", e)
+  }
+}
+  }
+
+  private def beginTransaction(newTransactionEpoch: Int, newTransactionOffset: 
Long): Unit = {
+if (_transactionEpoch != -1) {
+  abortTransaction("a new begin transaction record appeared", 
newTransactionOffset)
+}
+_transactionEpoch = newTransactionEpoch
+_transactionOffset = newTransactionOffset
+_transactionRecords = new util.ArrayList[ApiMessageAndVersion]
+log.debug("Beginning metadata transaction {}_{}.", _transactionOffset, 
_transactionEpoch)
+  }
+
+  private def endTransaction(delta: MetadataDelta, endOffset: Long): Unit = {
+log.debug("Ending metadata transaction {}_{} at offset {}", 
_transactionOffset, _transactionEpoch, endOffset)
+var index = 0
+_transactionRecords.forEach(record => {

Review Comment:
   We should guard against a null `_transactionRecords` here. If two 
EndTransactionRecord-s appeared (somehow..) it would raise an NPE



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -995,6 +1024,42 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
 }
 }
 
+private void beginTransaction(
+long newTransactionOffset,
+int newTransactionEpoch
+) {
+if (transactionEpoch != -1) {
+abortTransaction("a new begin transaction record appeared", 
newTransactionOffset);
+}
+transactionOffset = newTransactionOffset;
+transactionEpoch = newTransactionEpoch;
+snapshotRegistry.getOrCreateSnapshot(transactionOffset);
+log.debug("Beginning metadata transaction {}_{}.", transactionOffset, 
transactionEpoch);
+}
+
+private void endTransaction(long offset) {
+if (transactionEpoch == -1) {
+throw fatalFaultHandler.handleFault("Tried to end a transaction at 
offset " + offset +
+" but there was no current transaction.");
+}
+transactionOffset = -1L;
+transactionEpoch = -1;
+log.debug("Completing metadata transaction {}_{}.", transactionOffset, 
transactionEpoch);
+}
+
+private void abortTransaction(String reason, long offset) {

Review Comment:
   maybe give "offset" a more explicit name?



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -254,6 +270,9 @@ class BrokerMetadataListener(
 while (iterator.hasNext) {
   val batch = iterator.next()
 
+  if (_transactionEpoch != -1 && _transactionEpoch != batch.epoch) {
+abortTransaction("the log epoch changed to " + batch.epoch, 
batch.baseOffset)

Review Comment:
   Since we're in scala here, we can use s-strings



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -860,7 +886,7 @@ public void handleCommit(BatchReader 
reader) {
 int i = 1;
 for (ApiMessageAndVersion message : messages) {
 try {
-replay(message.message(), 
Optional.empty(), offset);
+replay(message.message(), 
Optional.empty(), offset + i - 1, epoch);

Review Comment:
   The `offset` -> `offset + i - 1` change, was that a bug previously? Were we 
just sending the batch offset to replay before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14541) Profile produce workload for Apache Kafka

2022-12-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14541:
-
Description: 
I have been profiling Kafka (3.4.0 / trunk right now) for a produce only 
workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] 
workloads. The goal is to get a better understanding of CPU usage profile for 
Kafka and eliminate potential overheads to reduce CPU consumption.
h2. *Setup*

R6i.16xl (64 cores)
OS: Amazon Linux 2

Single broker, One topic, One partition

Plaintext

Prometheus Java agent attached

 
{code:java}
-XX:+PreserveFramePointer
-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints{code}
 

 
{code:java}
queued.max.requests=1
num.network.threads=32
num.io.threads=128
socket.request.max.bytes=104857600{code}
 
h3. Producer setup:
{code:java}
batch.size=9000
buffer.memory=33554432
enable.idempotence=false
linger.ms=0
receive.buffer.bytes=-1
send.buffer.bytes=-1
max.in.flight.requests.per.connection=10{code}
 
h3. Profiler setup:

[async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this 
profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from 
safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" 
mode (wall clock time) or "cycles" mode (used for better kernel call stack)
h2. Observations
 # Processor.run > Processor#updateRequestMetrics() is a very expensive call. 
We should revisit whether we want to pre-compute histograms or not. Maybe 
upgrading to latest dropwizard will improve this? (JIRA: 
https://issues.apache.org/jira/browse/KAFKA-14423)
 # (Processor > Selector.poll), (Processor > Selector.write) and many other 
places  - Accumulative cost of Sensor#recordInternal is high. 
 # Processor threads are consuming more CPU than Handler threads?!! Perhaps 
because handler threads spend a lot of time waiting for partition lock at 
UnifiedLock.scala
 # HandleProduceRequest > RequestSizeInBytes - Unnecessary call to calculate 
size in bytes here. Low hanging opportunity to improve CPU utilisation for a 
request heavy workload. (Fixed in 
https://issues.apache.org/jira/browse/KAFKA-14414)
 # UnifiedLog#append > HeapByteBuffer.duplicate() - Why do we duplicate the 
buffer here? Ideally we shouldn't be making copies of buffer during the produce 
workflow. We should be using the same buffer after reading from socket to 
writing in a file.
 # Processor > Selector.select - Why is epoll consuming CPU cycles? It should 
have the thread in a timed_waiting state and hence, shouldn't consume CPU at 
all.
 # In a produce workload writing to socket is more CPU intensive than reading 
from the socket. This is surprising because reading would read more data from 
the socket (produce records) whereas writing would only write the response back 
which doesn't contain record data.
 # RequestChannel#sendResponse > wakeup - This is the call which wakes up the 
selector by writing to a file descriptor. Why is this so expensive?
 # The bottleneck in throughout is thread contention at the UnifiedLog.scala 
where handler thread wait for 80-90% of the time trying to acquire a lock. This 
observation was recorded from separate profiling. 

I am still analysing the flamegraph (cpu mode attached here). Please feel free 
to comment on any of the observations or add your own observations here.

  was:
I have been profiling Kafka (3.4.0 / trunk right now) for a produce only 
workload and the [OpenMessaging|https://openmessaging.cloud/docs/benchmarks/] 
workloads. The goal is to get a better understanding of CPU usage profile for 
Kafka and eliminate potential overheads to reduce CPU consumption.
h2. *Setup*

R6i.16xl (64 cores)
OS: Amazon Linux 2

Single broker, One topic, One partition

Plaintext

Prometheus Java agent attached

 
{code:java}
-XX:+PreserveFramePointer
-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints{code}
 

 
{code:java}
queued.max.requests=1
num.network.threads=32
num.io.threads=128
socket.request.max.bytes=104857600{code}
 
h3. Producer setup:
{code:java}
batch.size=9000
buffer.memory=33554432
enable.idempotence=false
linger.ms=0
receive.buffer.bytes=-1
send.buffer.bytes=-1
max.in.flight.requests.per.connection=10{code}
 
h3. Profiler setup:

[async-profiler|https://github.com/jvm-profiling-tools/async-profiler] (this 
profiler + -XX:+DebugNonSafepoints ensure that profiling doesn't suffer from 
safepoint bias). Note that flamegraph can be generated in "cpu" mode or "wall" 
mode (wall clock time) or "cycles" mode (used for better kernel call stack)
h2. Observations
 # Processor.run > Processor#updateRequestMetrics() is a very expensive call. 
We should revisit whether we want to pre-compute histograms or not. Maybe 
upgrading to latest dropwizard will improve this?
 # (Processor > Selector.poll), (Processor > Selector.write) and many other 
places  - Accumulative cost of 

  1   2   >