[GitHub] [kafka] wenbingshen opened a new pull request, #10625: MINOR: Fix error log for bounce broker

2023-05-05 Thread via GitHub


wenbingshen opened a new pull request, #10625:
URL: https://github.com/apache/kafka/pull/10625

   bounce leader broker for topic partition was mistakenly log as a follower 
broker
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wenbingshen closed pull request #10625: MINOR: Fix error log for bounce broker

2023-05-05 Thread via GitHub


wenbingshen closed pull request #10625: MINOR: Fix error log for bounce broker
URL: https://github.com/apache/kafka/pull/10625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14968) Consumer Notification Callback

2023-05-05 Thread Hendrik Jan van Randen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hendrik Jan van Randen updated KAFKA-14968:
---
Description: 
We want to use Kafka for all the communication between our services. 

We have many high frequency topics (many messages per second), for which the 
Kafka mechanism of polling by consumers is efficient and suitable.

However, we also have low frequency topics (no or few messages per day). If 
such a low frequency message is produced, all consumers should consume it 
immediately. But it would be inefficient if all those consumers are polling 
continuously with a high frequency (e.g. each second), while no or few such 
messages are produced each day. Most efficient would be if for such a low 
frequency topic the consumers can register a callback (e.g. a REST endpoint) 
with Kafka, and Kafka calls that callback when a message is produced. Either 
supplying the message in the call (pro: consumer can handle it immediately), or 
just to notify the consumer that it should do a one time poll (pro: same 
message transfer mechanism as everywhere).

I just talked this over with Ryan Corrigan of Confluent.

In the coming weeks I will supply more information about the urgency of this 
issue for my company, make it conform the Kafka Jira issue guidelines and 
create a Kafka Improvement Proposal for it.

I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
the Confluent technical staff, or Kafka committers, is present at the Kafka 
Summit to talk over and investigate possible solution directions for this?

  was:
We want to use Kafka for all the communication between our services. 

We have many high frequency topics (many messages per second), for which the 
Kafka mechanism of polling by consumers is efficient and suitable.

However, we also have low frequency topics (no or few messages per day). If 
such a low frequency message is produced, all consumers should consume it 
immediately. But it would be inefficient if all those consumers are polling 
continuously with a high frequency (e.g. each second), while no or few such 
messages are produced each day. Most efficient would be if for such a low 
frequency topic the consumers can register a callback (e.g. a REST endpoint) 
with Kafka, and Kafka calls that callback when a message is produced. Either 
supplying the message in the call (pro: consumer can handle it immediately), or 
just to notify the consumer that it should do a one time poll (pro: same 
message transfer mechanism as everywhere).

I just talked this over with Ryan Corrigan of Confluent.

In the coming weeks I will supply more information about the urgency of this 
issue.

I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
the Confluent technical staff, or Kafka committers, is present at the Kafka 
Summit to talk over and investigate possible solution directions for this?


> Consumer Notification Callback
> --
>
> Key: KAFKA-14968
> URL: https://issues.apache.org/jira/browse/KAFKA-14968
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Hendrik Jan van Randen
>Assignee: Hendrik Jan van Randen
>Priority: Major
>
> We want to use Kafka for all the communication between our services. 
> We have many high frequency topics (many messages per second), for which the 
> Kafka mechanism of polling by consumers is efficient and suitable.
> However, we also have low frequency topics (no or few messages per day). If 
> such a low frequency message is produced, all consumers should consume it 
> immediately. But it would be inefficient if all those consumers are polling 
> continuously with a high frequency (e.g. each second), while no or few such 
> messages are produced each day. Most efficient would be if for such a low 
> frequency topic the consumers can register a callback (e.g. a REST endpoint) 
> with Kafka, and Kafka calls that callback when a message is produced. Either 
> supplying the message in the call (pro: consumer can handle it immediately), 
> or just to notify the consumer that it should do a one time poll (pro: same 
> message transfer mechanism as everywhere).
> I just talked this over with Ryan Corrigan of Confluent.
> In the coming weeks I will supply more information about the urgency of this 
> issue for my company, make it conform the Kafka Jira issue guidelines and 
> create a Kafka Improvement Proposal for it.
> I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
> the Confluent technical staff, or Kafka committers, is present at the Kafka 
> Summit to talk over and investigate possible solution directions for this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file

2023-05-05 Thread via GitHub


junrao commented on code in PR #13503:
URL: https://github.com/apache/kafka/pull/13503#discussion_r1186548617


##
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.errors.FencedLeaderEpochException
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+class ReplicaFetcherTierStateMachineTest {
+
+  val truncateOnFetch = true
+  val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> 
Uuid.randomUuid())
+  val version = ApiKeys.FETCH.latestVersion()
+  private val failedPartitions = new FailedPartitions
+
+  private def mkBatch(baseOffset: Long, leaderEpoch: Int, records: 
SimpleRecord*): RecordBatch = {

Review Comment:
   It seems that `mkBatch` and `initialFetchState` are not duplicated from 
AbstractFetcherThreadTest. Could we put them in a util class and reuse?



##
core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.errors.FencedLeaderEpochException
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+class ReplicaFetcherTierStateMachineTest {
+
+  val truncateOnFetch = true
+  val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> 
Uuid.randomUuid())
+  val version = ApiKeys.FETCH.latestVersion()
+  private val failedPartitions = new FailedPartitions
+
+  private def mkBatch(baseOffset: Long, leaderEpoch: Int, records: 
SimpleRecord*): RecordBatch = {

Review Comment:
   It seems that `mkBatch` and `initialFetchState` are now duplicated from 
AbstractFetcherThreadTest. Could we put them in a util class and reuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer

2023-05-05 Thread via GitHub


vvcephei commented on PR #13455:
URL: https://github.com/apache/kafka/pull/13455#issuecomment-1536829632

   There were a lot of test failures. I just triggered it again.
   
   It doesn't seem implausible that changing the MockConsumer could cause other 
tests to fail, but I didn't look at whether the failing tests actually use it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-05 Thread via GitHub


vvcephei commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1536827381

   I'm re-running the tests to see if we can get a green build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719993#comment-17719993
 ] 

Mickael Maison commented on KAFKA-14084:


Thanks!

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719992#comment-17719992
 ] 

Mickael Maison commented on KAFKA-14831:


Thanks!

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719988#comment-17719988
 ] 

Kirk True commented on KAFKA-14831:
---

Sounds good, [~jolshan]. I updated the fix version to 3.6.0. Thanks!

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14831:
--
Fix Version/s: 3.6.0
   (was: 3.5.0)

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.6.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-05-05 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-14084.
---
Resolution: Fixed

SCRAM support in KRaft is implemented.

Test of SCRAM authorization have been updated to work in KRaft mode.

Migration support of SCRAM credentials from ZK to SCRAM have been implemented 
and tested.

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14970) Dual write mode testing for SCRAM and Quota

2023-05-05 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-14970:
-

 Summary: Dual write mode testing for SCRAM and Quota
 Key: KAFKA-14970
 URL: https://issues.apache.org/jira/browse/KAFKA-14970
 Project: Kafka
  Issue Type: Test
  Components: kraft
Reporter: Proven Provenzano
Assignee: Proven Provenzano


SCRAM and Quota are stored together in ZK and we need better testing to 
validate the dual write mode support for them.

I will add some additional tests for this.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14791) Create a builder class for PartitionRegistration

2023-05-05 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant reassigned KAFKA-14791:


Assignee: Andrew Grant

> Create a builder class for PartitionRegistration
> 
>
> Key: KAFKA-14791
> URL: https://issues.apache.org/jira/browse/KAFKA-14791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chia7712 merged pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-05 Thread via GitHub


chia7712 merged PR #13659:
URL: https://github.com/apache/kafka/pull/13659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-05-05 Thread via GitHub


mjsax commented on PR #13633:
URL: https://github.com/apache/kafka/pull/13633#issuecomment-1536581114

   Thank @vvcephei -- for K14839 in particular, it's nothing users should 
overwrite, in fact, they should not even extend the class in question at all.
   
   I did not double check in general though, but I don't think we would have 
any `protected` vars for public interfaces/classed that uses would need to know 
of.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-05-05 Thread via GitHub


mjsax commented on code in PR #13633:
URL: https://github.com/apache/kafka/pull/13633#discussion_r1186357597


##
build.gradle:
##
@@ -2057,6 +2057,7 @@ project(':streams') {
   srcJar.dependsOn 'processMessages'
 
   javadoc {
+options.memberLevel = JavadocMemberLevel.PUBLIC  // Document only public 
members/API

Review Comment:
   @atu-sharm -- Leave it up to you, to change this PR or do a follow up. Just 
let me know what you prefer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719941#comment-17719941
 ] 

Justine Olshan commented on KAFKA-14831:


I guess I had assumed we missed the code freeze and haven't been treating this 
as a blocker. It might be good to wait until 3.6.

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-05 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1536545551

   
https://github.com/apache/kafka/commit/ea81e99e5980c807414651034a8c60426a158ca4 
added TransitionToUnitialized. Can we confirm this has the behavior we expect? 
I believe this would count as a background transition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-05 Thread via GitHub


jolshan commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1186307624


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The epoch of the member when the state was last updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:

Review Comment:
   Do we ever use NEW_TARGET_ASSIGNMENT state? I don't see it in the code 
anywhere else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14961) DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky

2023-05-05 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719933#comment-17719933
 ] 

Kirk True commented on KAFKA-14961:
---

cc [~pnee] [~lianetm] 

> DefaultBackgroundThreadTest.testStartupAndTearDown test is flasky
> -
>
> Key: KAFKA-14961
> URL: https://issues.apache.org/jira/browse/KAFKA-14961
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> When running the test suite locally I noticed the following error
> {code:java}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
>   at 
> app//org.apache.kafka.clients.consumer.internals.DefaultBackgroundThreadTest.testStartupAndTearDown(DefaultBackgroundThreadTest.java:95)
>  {code}
> which happened only once and I could reproduce it again. 
> I further noticed some NPE in debug logs in the form of
> {code:java}
>  ERROR The background thread failed due to unexpected error 
> (org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread:166)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.handlePollResult(DefaultBackgroundThread.java:200)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>     at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>     at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
>     at 
> java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1675)
>     at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>     at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>     at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>     at 
> java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:553)
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.runOnce(DefaultBackgroundThread.java:187)
>     at 
> org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread.run(DefaultBackgroundThread.java:159)
>  {code}
> which is due to missing stubs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14831:
--
Description: In KAFKA-14830, the producer hit an illegal state error. The 
error was propagated to the {{Sender}} thread and logged, but the producer 
otherwise continued on. It would be better to make illegal state errors fatal 
since continuing to write to transactions when the internal state is 
inconsistent may cause incorrect and unpredictable behavior.  (was: In 
KAFKA-14830, the producer hit an illegal state error. The error was propagated 
to the `Sender` thread and logged, but the producer otherwise continued on. It 
would be better to make illegal state errors fatal since continuing to write to 
transactions when the internal state is inconsistent may cause incorrect and 
unpredictable behavior.)

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the {{Sender}} thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719931#comment-17719931
 ] 

Kirk True commented on KAFKA-14831:
---

[~jolshan] has been providing most of the feedback on the PR. [~jolshan] - what 
do you think? Is it "near" to be doing done or no? I'd prefer to wait if 
there's any question about correctness/stability.

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the `Sender` thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14831:
--
Reviewer: Justine Olshan

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the `Sender` thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei commented on pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-05-05 Thread via GitHub


vvcephei commented on PR #13633:
URL: https://github.com/apache/kafka/pull/13633#issuecomment-1536478379

   Actually, @mjsax , I do have one question before merging: Are there classes 
that are meant to be overridden by users? If so, should their protected members 
be documented?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on a diff in pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-05-05 Thread via GitHub


vvcephei commented on code in PR #13633:
URL: https://github.com/apache/kafka/pull/13633#discussion_r1186269574


##
build.gradle:
##
@@ -2057,6 +2057,7 @@ project(':streams') {
   srcJar.dependsOn 'processMessages'
 
   javadoc {
+options.memberLevel = JavadocMemberLevel.PUBLIC  // Document only public 
members/API

Review Comment:
   I'd be +1 on expanding the scope to the whole project. If something needs to 
be publicly documented, it should probably be made public.
   
   FWIW, I'm also ok with merging as-is, and following up with a proposal to 
apply it to the whole repo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719922#comment-17719922
 ] 

Mickael Maison commented on KAFKA-14084:


So I think we could close this ticket, as effectively  SCRAM support with KRaft 
is in 3.5 and open another ticket to follow up with the tests. WDYT?

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-05 Thread via GitHub


clayburn commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1536412348

   > Thank you for the contribution.
   > 
   > It would be super helpful (perhaps outside of this PR) if you could add a 
section in our documentation [1] on how to monitor the result of the scans. I 
am asking this because we had a similar question from a new contributor 
recently in the email list [2] and you sound like an expert in the CI build(s) 
at Apache :)
   
I would not classify myself as an expert in Apache CI builds , but we have 
quite a bit of experience helping OSS projects with setup and usage of Gradle 
Enterprise to benefit from the build insights.
   
   I would be happy to contribute the documentation updates in a future PR.
   
   > I like this improvement and looking forward to use the new UI to debug our 
flaky test failures. The infra is hosted by Apache, so I don't see any 
compliance concerns as well regarding this change.
   
   We've worked quite closely with the infra team on this rollout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-05 Thread via GitHub


clayburn commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1186205824


##
settings.gradle:
##
@@ -13,6 +13,38 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+plugins {
+id 'com.gradle.enterprise' version '3.13.1'
+id 'com.gradle.common-custom-user-data-gradle-plugin' version '1.10'
+}
+
+def isGithubActions = System.getenv('GITHUB_ACTIONS') != null
+def isJenkins = System.getenv('JENKINS_URL') != null
+def isCI = isGithubActions || isJenkins
+
+gradleEnterprise {
+server = "https://ge.apache.org;
+buildScan {
+capture { taskInputFiles = true }
+uploadInBackground = !isCI
+publishAlways()
+publishIfAuthenticated()
+obfuscation {
+ipAddresses { addresses -> addresses.collect { address -> 
"0.0.0.0"} }

Review Comment:
   ✅



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-05 Thread via GitHub


clayburn commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1186202287


##
settings.gradle:
##
@@ -13,6 +13,38 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+plugins {
+id 'com.gradle.enterprise' version '3.13.1'

Review Comment:
   Yes, they would be loaded and applied, but they will essentially be a no-op 
if a build scan will not be published. The Gradle Enterprise Gradle Plugin only 
monitors the build without affecting the build at all, unless you opt-in to 
certain features (such as test-retry, for example).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14859) Support SCRAM ZK to KRaft Migration

2023-05-05 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-14859.
---
Resolution: Fixed

Merged into trunk and 3.5

> Support SCRAM ZK to KRaft Migration
> ---
>
> Key: KAFKA-14859
> URL: https://issues.apache.org/jira/browse/KAFKA-14859
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>
> I want to allow existing ZK installations to be able to migrate to KRaft and 
> support their existing SCRAM credentials.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-05-05 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719899#comment-17719899
 ] 

Proven Provenzano commented on KAFKA-14084:
---

[~mimaison] The last commit for SCRAM migration 
https://issues.apache.org/jira/browse/KAFKA-14859 
[https://github.com/apache/kafka/pull/13628] was merged into master and 3.5.

I am working on more tests but they are not needed for 3.5.

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719886#comment-17719886
 ] 

Mickael Maison commented on KAFKA-14698:


[~akhileshchg]/[~mumrah] What's the status of this ticket? 

> Received request api key LEADER_AND_ISR which is not enabled
> 
>
> Key: KAFKA-14698
> URL: https://issues.apache.org/jira/browse/KAFKA-14698
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Assignee: Akhilesh Chaganti
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
> Attachments: broker0.log, controller.log, test_online_migration.tar.gz
>
>
> I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a 
> single topic "test" with 2 partitions and 2 replicas and the internal 
> __consumer_offsets topics.
> While following the ZooKeeper to KRaft migration steps from 
> [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting 
> issues at the Migrating brokers to KRaft step.
> When I restart a broker as KRaft, it repetitively prints the following error:
> {code:java}
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key LEADER_AND_ISR which is not enabled
> [2023-02-09 16:14:30,334] ERROR Closing socket for 
> 192.168.1.11:9092-192.168.1.11:63737-371 because of error 
> (kafka.network.Processor)
> {code}
> The controller repetitively prints the following error:
> {code:java}
> [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] 
> Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client 
> requested connection close from node 0 
> (org.apache.kafka.clients.NetworkClient)
> [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 
> disconnected. (org.apache.kafka.clients.NetworkClient)
> {code}
> Attached the controller logs and logs from broker-0
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14831) Illegal state errors should be fatal in transactional producer

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719885#comment-17719885
 ] 

Mickael Maison commented on KAFKA-14831:


Is this near completion? If not I'll move this out of 3.5 as we're now past 
code freeze.

> Illegal state errors should be fatal in transactional producer
> --
>
> Key: KAFKA-14831
> URL: https://issues.apache.org/jira/browse/KAFKA-14831
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Major
> Fix For: 3.5.0
>
>
> In KAFKA-14830, the producer hit an illegal state error. The error was 
> propagated to the `Sender` thread and logged, but the producer otherwise 
> continued on. It would be better to make illegal state errors fatal since 
> continuing to write to transactions when the internal state is inconsistent 
> may cause incorrect and unpredictable behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14840) Handle KRaft snapshots in dual-write mode

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719881#comment-17719881
 ] 

Mickael Maison commented on KAFKA-14840:


[~mumrah] The associated PR is merged, is this now complete?

> Handle KRaft snapshots in dual-write mode
> -
>
> Key: KAFKA-14840
> URL: https://issues.apache.org/jira/browse/KAFKA-14840
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1
>
>
> While the KRaft controller is making writes back to ZK during the migration, 
> we need to handle the case when a snapshot is loaded. This can happen for a 
> number of reasons in KRaft.
> The difficulty here is we will need to compare the loaded snapshot with the 
> entire state in ZK. Most likely, this will be a very expensive operation.
> Without this, dual-write mode cannot safely tolerate a snapshot being loaded, 
> so marking this as a 3.5 blocker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719880#comment-17719880
 ] 

Mickael Maison commented on KAFKA-14918:


Ping [~mumrah] can you share an update? Otherwise I'll move that ticket to the 
next release.

> KRaft controller sending ZK controller RPCs to KRaft brokers
> 
>
> Key: KAFKA-14918
> URL: https://issues.apache.org/jira/browse/KAFKA-14918
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> During the migration, when upgrading a ZK broker to KRaft, the controller is 
> incorrectly sending UpdateMetadata requests to the KRaft controller. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14805) KRaft Controller shouldn't allow metadata updates before migration starts

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719878#comment-17719878
 ] 

Mickael Maison commented on KAFKA-14805:


[~mumrah] The associated PR is merged, is this work complete? Otherwise does 
this need to be in 3.5?

> KRaft Controller shouldn't allow metadata updates before migration starts
> -
>
> Key: KAFKA-14805
> URL: https://issues.apache.org/jira/browse/KAFKA-14805
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> When starting a ZK to KRaft migration, the new KRaft quorum should not accept 
> external metadata updates from things like CreateTopics or 
> AllocateProducerIds. Having metadata present in the log prior to the 
> migration can lead to undefined state, which is not great.
> This is currently causing test failures on trunk because some producer is 
> allocating a producer ID between the time the KRaft quorum starts and the 
> migration starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14084) Support SCRAM when using KRaft mode

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719877#comment-17719877
 ] 

Mickael Maison commented on KAFKA-14084:


[~cmccabe]/[~pprovenzano] What is the status of this? Have all the changes been 
merged?

> Support SCRAM when using KRaft mode
> ---
>
> Key: KAFKA-14084
> URL: https://issues.apache.org/jira/browse/KAFKA-14084
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: kip-500
> Fix For: 3.5.0
>
>
> Support SCRAM when using KRaft mode, as specified in KIP-631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13670: KAFKA-14962: Trim whitespace from ACL configuration

2023-05-05 Thread via GitHub


divijvaidya commented on PR #13670:
URL: https://github.com/apache/kafka/pull/13670#issuecomment-1536300068

   The failing tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
testElectUncleanLeadersForOnePartition(String).quorum=kraft – 
kafka.api.PlaintextAdminIntegrationTest
   26s
   Build / JDK 11 and Scala 2.13 / testSingleNodeCluster() – 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest
   1m 18s
   Build / JDK 11 and Scala 2.13 / testMultiNodeCluster() – 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest
   1m 52s
   Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-05-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719868#comment-17719868
 ] 

Mickael Maison commented on KAFKA-14901:


[~gharris1727] Do you think this is a blocker for 3.5? I've not been able to 
reproduce this failure and nobody has taken the time to investigate this yet.

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14969) Upgrade Mockito to 4.11.0

2023-05-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719865#comment-17719865
 ] 

Ismael Juma commented on KAFKA-14969:
-

I thought we should go straight to 5.x when we drop support for Java 8. Unless 
these issues are also present with Mockito 5 and Java 11 (it wasn't clear to 
me).

> Upgrade Mockito to 4.11.0
> -
>
> Key: KAFKA-14969
> URL: https://issues.apache.org/jira/browse/KAFKA-14969
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> Upgrading to Mockito 4.11.0 leads to following errors. Need to fix them 
> before upgrade. Note that we cannot upgrade to Mockito 5 because it doesn't 
> support JDK 8.
> {code:java}
> [2023-05-05T12:21:58.628Z] > Task :core:compileTestScala
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2654:31:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types 
> (kafka.server.metadata.MockConfigRepository)
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2658:25:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2688:31:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types 
> (kafka.server.metadata.MockConfigRepository)
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2692:25:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2728:31:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types 
> (kafka.server.metadata.MockConfigRepository)
> [2023-05-05T12:21:58.628Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2734:25:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
> [2023-05-05T12:22:06.267Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:807:21:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:22:06.267Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:22:06.267Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:22:06.267Z] match argument types 
> (controller.eventManager.ControllerEventThread)
> [2023-05-05T12:22:06.267Z] [Error] 
> /home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:838:21:
>  ambiguous reference to overloaded definition,
> [2023-05-05T12:22:06.267Z] both method spy in class Mockito of type [T](x$1: 
> T*)T
> [2023-05-05T12:22:06.267Z] and  method spy in class Mockito of type [T](x$1: 
> T)T
> [2023-05-05T12:22:06.267Z] match argument types 
> (controller.eventManager.ControllerEventThread){code}
> {code:java}
> 

[GitHub] [kafka] lucasbru commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


lucasbru commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1186098795


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   In the state updater code path, the restoration for the new task will be 
done by a separate thread, and the main thread will immediately go back to 
processing. There is an internal configuration (StreamConfig.InternalConfig) to 
enable the state updater, if you have an integration test for this bug you 
could try it with the flag enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14969) Upgrade Mockito to 4.11.0

2023-05-05 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14969:


 Summary: Upgrade Mockito to 4.11.0
 Key: KAFKA-14969
 URL: https://issues.apache.org/jira/browse/KAFKA-14969
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Divij Vaidya
 Fix For: 3.6.0


Upgrading to Mockito 4.11.0 leads to following errors. Need to fix them before 
upgrade. Note that we cannot upgrade to Mockito 5 because it doesn't support 
JDK 8.


{code:java}
[2023-05-05T12:21:58.628Z] > Task :core:compileTestScala
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2654:31:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types 
(kafka.server.metadata.MockConfigRepository)
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2658:25:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2688:31:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types 
(kafka.server.metadata.MockConfigRepository)
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2692:25:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2728:31:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types 
(kafka.server.metadata.MockConfigRepository)
[2023-05-05T12:21:58.628Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:2734:25:
 ambiguous reference to overloaded definition,
[2023-05-05T12:21:58.628Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:21:58.628Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:21:58.628Z] match argument types (kafka.log.LogManager)
[2023-05-05T12:22:06.267Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:807:21:
 ambiguous reference to overloaded definition,
[2023-05-05T12:22:06.267Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:22:06.267Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:22:06.267Z] match argument types 
(controller.eventManager.ControllerEventThread)
[2023-05-05T12:22:06.267Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:838:21:
 ambiguous reference to overloaded definition,
[2023-05-05T12:22:06.267Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:22:06.267Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:22:06.267Z] match argument types 
(controller.eventManager.ControllerEventThread){code}
{code:java}
[2023-05-05T12:22:40.865Z] > Task :core:compileTestScala
[2023-05-05T12:22:40.865Z] [Error] 
/home/jenkins/jenkins-agent/712657a4/workspace/Kafka_kafka-pr_PR-13673/core/src/test/scala/unit/kafka/log/LogManagerTest.scala:171:18:
 ambiguous reference to overloaded definition,
[2023-05-05T12:22:40.865Z] both method spy in class Mockito of type [T](x$1: 
T*)T
[2023-05-05T12:22:40.865Z] and  method spy in class Mockito of type [T](x$1: T)T
[2023-05-05T12:22:40.865Z] match argument types (kafka.log.LogManager) and 
expected result type 

[jira] [Updated] (KAFKA-14968) Consumer Notification Callback

2023-05-05 Thread Hendrik Jan van Randen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hendrik Jan van Randen updated KAFKA-14968:
---
Description: 
We want to use Kafka for all the communication between our services. 

We have many high frequency topics (many messages per second), for which the 
Kafka mechanism of polling by consumers is efficient and suitable.

However, we also have low frequency topics (no or few messages per day). If 
such a low frequency message is produced, all consumers should consume it 
immediately. But it would be inefficient if all those consumers are polling 
continuously with a high frequency (e.g. each second), while no or few such 
messages are produced each day. Most efficient would be if for such a low 
frequency topic the consumers can register a callback (e.g. a REST endpoint) 
with Kafka, and Kafka calls that callback when a message is produced. Either 
supplying the message in the call (pro: consumer can handle it immediately), or 
just to notify the consumer that it should do a one time poll (pro: same 
message transfer mechanism as everywhere).

I just talked this over with Ryan Corrigan of Confluent.

In the coming weeks I will supply more information about the urgency of this 
issue.

I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
the Confluent technical staff, or Kafka committers, is present at the Kafka 
Summit to talk over and investigate possible solution directions for this?

  was:
We want to use Kafka for all the communication between our services. 

We have many high frequency topics (many messages per second), for which the 
Kafka mechanism of polling by consumers is efficient and suitable.

However, we also have low frequency topics (no or few messages per day). If 
such a low frequency message is produced, all consumers should consume it 
immediately. But it would be inefficient if all those consumers are polling 
continuously with a high frequency (e.g. each second), while no or few such 
messages are produced each day. Most efficient would be if for such a low 
frequency topic the consumers can register a callback (e.g. a REST endpoint) 
with Kafka, and Kafka calls that callback when a message is produced. Either 
supplying the message in the call (pro: consumer can handle it immediately), or 
just to notify the consumer that it should do a one time poll (pro: same 
message transfer mechanism as everywhere).

I just talked this over with Ryan Corrigan of Confluent.

In the coming weeks I will supply more information about the urgency of this 
issue.

I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
the Confluent technical staff, or Kafka committers, is present at the Kafka 
Summit to investigate possible solution directions for this?


> Consumer Notification Callback
> --
>
> Key: KAFKA-14968
> URL: https://issues.apache.org/jira/browse/KAFKA-14968
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Hendrik Jan van Randen
>Assignee: Hendrik Jan van Randen
>Priority: Major
>
> We want to use Kafka for all the communication between our services. 
> We have many high frequency topics (many messages per second), for which the 
> Kafka mechanism of polling by consumers is efficient and suitable.
> However, we also have low frequency topics (no or few messages per day). If 
> such a low frequency message is produced, all consumers should consume it 
> immediately. But it would be inefficient if all those consumers are polling 
> continuously with a high frequency (e.g. each second), while no or few such 
> messages are produced each day. Most efficient would be if for such a low 
> frequency topic the consumers can register a callback (e.g. a REST endpoint) 
> with Kafka, and Kafka calls that callback when a message is produced. Either 
> supplying the message in the call (pro: consumer can handle it immediately), 
> or just to notify the consumer that it should do a one time poll (pro: same 
> message transfer mechanism as everywhere).
> I just talked this over with Ryan Corrigan of Confluent.
> In the coming weeks I will supply more information about the urgency of this 
> issue.
> I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
> the Confluent technical staff, or Kafka committers, is present at the Kafka 
> Summit to talk over and investigate possible solution directions for this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14968) Consumer Notification Callback

2023-05-05 Thread Hendrik Jan van Randen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hendrik Jan van Randen reassigned KAFKA-14968:
--

Assignee: Hendrik Jan van Randen

> Consumer Notification Callback
> --
>
> Key: KAFKA-14968
> URL: https://issues.apache.org/jira/browse/KAFKA-14968
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Hendrik Jan van Randen
>Assignee: Hendrik Jan van Randen
>Priority: Major
>
> We want to use Kafka for all the communication between our services. 
> We have many high frequency topics (many messages per second), for which the 
> Kafka mechanism of polling by consumers is efficient and suitable.
> However, we also have low frequency topics (no or few messages per day). If 
> such a low frequency message is produced, all consumers should consume it 
> immediately. But it would be inefficient if all those consumers are polling 
> continuously with a high frequency (e.g. each second), while no or few such 
> messages are produced each day. Most efficient would be if for such a low 
> frequency topic the consumers can register a callback (e.g. a REST endpoint) 
> with Kafka, and Kafka calls that callback when a message is produced. Either 
> supplying the message in the call (pro: consumer can handle it immediately), 
> or just to notify the consumer that it should do a one time poll (pro: same 
> message transfer mechanism as everywhere).
> I just talked this over with Ryan Corrigan of Confluent.
> In the coming weeks I will supply more information about the urgency of this 
> issue.
> I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
> the Confluent technical staff, or Kafka committers, is present at the Kafka 
> Summit to investigate possible solution directions for this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14968) Consumer Notification Callback

2023-05-05 Thread Hendrik Jan van Randen (Jira)
Hendrik Jan van Randen created KAFKA-14968:
--

 Summary: Consumer Notification Callback
 Key: KAFKA-14968
 URL: https://issues.apache.org/jira/browse/KAFKA-14968
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: Hendrik Jan van Randen


We want to use Kafka for all the communication between our services. 

We have many high frequency topics (many messages per second), for which the 
Kafka mechanism of polling by consumers is efficient and suitable.

However, we also have low frequency topics (no or few messages per day). If 
such a low frequency message is produced, all consumers should consume it 
immediately. But it would be inefficient if all those consumers are polling 
continuously with a high frequency (e.g. each second), while no or few such 
messages are produced each day. Most efficient would be if for such a low 
frequency topic the consumers can register a callback (e.g. a REST endpoint) 
with Kafka, and Kafka calls that callback when a message is produced. Either 
supplying the message in the call (pro: consumer can handle it immediately), or 
just to notify the consumer that it should do a one time poll (pro: same 
message transfer mechanism as everywhere).

I just talked this over with Ryan Corrigan of Confluent.

In the coming weeks I will supply more information about the urgency of this 
issue.

I will be present at the Kafka Summit in London on May 16 & 17, 2023. Who of 
the Confluent technical staff, or Kafka committers, is present at the Kafka 
Summit to investigate possible solution directions for this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rohits64 closed pull request #13677: KAFKA-14401 with test [WIP]

2023-05-05 Thread via GitHub


rohits64 closed pull request #13677: KAFKA-14401 with test [WIP]
URL: https://github.com/apache/kafka/pull/13677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rohits64 commented on pull request #13677: KAFKA-14401 with test [WIP]

2023-05-05 Thread via GitHub


rohits64 commented on PR #13677:
URL: https://github.com/apache/kafka/pull/13677#issuecomment-1536165839

   Duplicate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1536165039

   Thank you @showuon and @ijuma for your time and patience in this review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


showuon merged PR #13312:
URL: https://github.com/apache/kafka/pull/13312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


showuon commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1536156206

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testBackoffAndRetryUponRetriableError()
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.PlaintextAdminIntegrationTest.testAclOperations(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-05 Thread via GitHub


divijvaidya commented on PR #13623:
URL: https://github.com/apache/kafka/pull/13623#issuecomment-1536155781

   Thank you @dajac. Appreciate your time and patience on this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-05 Thread via GitHub


showuon commented on PR #13660:
URL: https://github.com/apache/kafka/pull/13660#issuecomment-1536155265

   Retriggering the CI build since it failed with infra's issue.
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13660/3/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-05 Thread via GitHub


showuon commented on PR #13515:
URL: https://github.com/apache/kafka/pull/13515#issuecomment-1536153857

   Retriggering the CI build since it failed with some infra's issue.
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13515/8/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14926) Remove metrics on Log Cleaner shutdown

2023-05-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14926.
-
Resolution: Fixed

> Remove metrics on Log Cleaner shutdown
> --
>
> Key: KAFKA-14926
> URL: https://issues.apache.org/jira/browse/KAFKA-14926
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Minor
> Fix For: 3.6.0
>
>
> We register metrics with the KafkaMetricsGroup in LogCleaner.scala but we 
> don't remove them on shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-05 Thread via GitHub


dajac merged PR #13623:
URL: https://github.com/apache/kafka/pull/13623


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13673:
URL: https://github.com/apache/kafka/pull/13673#discussion_r1185999578


##
gradle/dependencies.gradle:
##
@@ -142,7 +142,7 @@ libs += [
   apachedsMavibotPartition: 
"org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
   apachedsJdbmPartition: 
"org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
   argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
-  bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
+  bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",

Review Comment:
   FYI, reviewers, this changes the version of bouncycaste which is compatible 
from JDK 1.8 onwards instead of earlier JDK 1.5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13673:
URL: https://github.com/apache/kafka/pull/13673#discussion_r1185999578


##
gradle/dependencies.gradle:
##
@@ -142,7 +142,7 @@ libs += [
   apachedsMavibotPartition: 
"org.apache.directory.server:apacheds-mavibot-partition:$versions.apacheds",
   apachedsJdbmPartition: 
"org.apache.directory.server:apacheds-jdbm-partition:$versions.apacheds",
   argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
-  bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
+  bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",

Review Comment:
   This changes the version of bouncycaste which is compatible from JDK 1.8 
onwards instead of earlier JDK 1.5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rohits64 opened a new pull request, #13677: KAFKA-14401 with test [WIP]

2023-05-05 Thread via GitHub


rohits64 opened a new pull request, #13677:
URL: https://github.com/apache/kafka/pull/13677

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-05 Thread via GitHub


chia7712 commented on PR #13659:
URL: https://github.com/apache/kafka/pull/13659#issuecomment-1536128806

   failed tests are unrelated. Will merge it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185979279


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   I see. Either ways, moving the flag around is failing some or the other 
condition. I have asked Lucas in the above thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185978385


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   Thanks @lucasbru. Are you suggesting that with new state-updated core, we 
won't get the problems which this PR is trying to solve? I am not aware of the 
the new state updater code path TBH.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185975845


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   I used INFO because I thought this is something users should be able to see 
by default. I am hoping this won't be a very frequent case and hence should be 
ok to notify by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185974966


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {

Review Comment:
   EOS_V2 was suggested by Guozhang 
[here](https://github.com/apache/kafka/pull/11433#discussion_r1147064683). Yeah 
I think  !newActiveStatefulTasks.isEmpty() seems like the right condition to 
add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185973531


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);

Review Comment:
   The call for `createNewTasks` was already there in `handleAssignment` to 
create new tasks. I just modified its return type to check if it's empty or not 
to trigger the tasks commit and offset commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185970601


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;

Review Comment:
   Yeah. this one was suggested by Guozhang. It is still not passing all the 
tests though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-05 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185968862


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {

Review Comment:
   Yeah that was pointed out by Guozhang as well 
[here](https://github.com/apache/kafka/pull/11433#discussion_r1147063043) and I 
had already made the change. Looks like you looked at an outdated version for 
this file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] huxiangquan commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-05 Thread via GitHub


huxiangquan commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1536010891

Can someone help kafka on windows , please?
   Two years have passed !
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1185859941


##
settings.gradle:
##
@@ -13,6 +13,38 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+plugins {
+id 'com.gradle.enterprise' version '3.13.1'

Review Comment:
   Would these plugins be loaded for local builds as well (I understand that 
they won't be used based on code below)?



##
settings.gradle:
##
@@ -13,6 +13,38 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+plugins {
+id 'com.gradle.enterprise' version '3.13.1'
+id 'com.gradle.common-custom-user-data-gradle-plugin' version '1.10'
+}
+
+def isGithubActions = System.getenv('GITHUB_ACTIONS') != null
+def isJenkins = System.getenv('JENKINS_URL') != null
+def isCI = isGithubActions || isJenkins
+
+gradleEnterprise {
+server = "https://ge.apache.org;
+buildScan {
+capture { taskInputFiles = true }
+uploadInBackground = !isCI
+publishAlways()
+publishIfAuthenticated()
+obfuscation {
+ipAddresses { addresses -> addresses.collect { address -> 
"0.0.0.0"} }

Review Comment:
   Please add a comment here so that future maintainers can understand the 
motivation of this line.
   
   Something like, "We are obfuscating ip address of the CI hosts here. 
HostNames will still be available in the scans to detect any Host specific 
slowness / flaky errors"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-05-05 Thread via GitHub


clolov commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1185861417


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
 assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
 }
 
+@ParameterizedTest
+@ValueSource(booleans = { true, false })
+public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+// Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+// Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+// Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+// Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+// timer never moves forward once the network client is invoked. If 
there is no available
+// response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+// enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+// queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+int offsetCommitCalls = 5;
+long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+IntStream.range(0, offsetCommitCalls).forEach(__ ->
+prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+// UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+// the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+// we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+// offset commit failed is returned.
+
+Map offsets = singletonMap(
+t1p,
+new OffsetAndMetadata(100L, "metadata")
+);
+
+if (commitSync) {
+assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+} else {
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+assertSame(inputOffsets, offsets);
+assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+callbackInvoked.set(true);
+});
+
+coordinator.invokeCompletedOffsetCommitCallbacks();
+assertTrue(callbackInvoked.get());
+}
+}
+
+@ParameterizedTest
+@ValueSource(booleans = { true, false })
+public void testRetryCommitUnknownTopicId(boolean commitSync) {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+Map offsets = singletonMap(
+t1p,
+new OffsetAndMetadata(100L, "metadata")
+);
+
+if (commitSync) {
+assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+} else {
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+// Unlike the commit offset sync API, the async API does not 
retry.

Review Comment:
   Yup, I will get to this today



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-05 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1185797643


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -424,6 +477,11 @@ ProducerIdAndEpoch producerIdAndEpoch() {
 }
 
 synchronized public void maybeUpdateProducerIdAndEpoch(TopicPartition 
topicPartition) {
+if (hasFatalError()) {

Review Comment:
   @kirktrue I agree, it's better to not mutate the state after we encountered 
a fatal error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-05-05 Thread via GitHub


bmscomp commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1535937778

   @divijvaidya  I get your point, thanks so much for those explanations 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-05 Thread via GitHub


cmccabe commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1535924639

   > @Hangleton : Agree that this change should be ideally confined to the 
Windows platform w/o any code change for others.
   
   Yeah. To be totally clear, it's OK to change the main code paths, but it 
really raises the stakes because then we want to understand that there haven't 
been performance or correctness regressions.
   
   @MPeli: I really do want this to get in eventually but in its current form 
it's very difficult to review. As a first step, how about splitting out just 
the FileRecords.java change, for example. If you want to do preallocation 
differently on windows, then explain how it will be different and why, and have 
a PR that just does that. Since preallocation seems to be working fine on other 
platforms this particular change could be put behind an isOsWindows check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-05 Thread via GitHub


cmccabe merged PR #13461:
URL: https://github.com/apache/kafka/pull/13461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820299


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {
+// The old well-known implementation for writeVarlong.
+LongFunction simpleImplementation = (long value) -> {
+ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+while ((value & 0xff80L) != 0L) {
+byte b = (byte) ((value & 0x7f) | 0x80);
+buffer.put(b);
+value >>>= 7;
+}
+buffer.put((byte) value);
+
+return buffer;
+};
+
+// compare the full range of values
+final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+ByteUtils.writeUnsignedVarlong(i, actual);
+final ByteBuffer expected = simpleImplementation.apply(i);
+assertArrayEquals(expected.array(), actual.array(), 
"Implementations do not match for number=" + i);
+actual.clear();
+}
+}
+
+@Test
+public void testCorrectnessWriteUnsignedVarint() {
+// The old well-known implementation for writeUnsignedVarint.
+IntFunction simpleImplementation = (int value) -> {
+ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+while (true) {
+if ((value & ~0x7F) == 0) {
+buffer.put((byte) value);
+break;
+} else {
+buffer.put((byte) ((value & 0x7F) | 0x80));
+value >>>= 7;
+}
+}
+
+return buffer;
+};
+
+// compare the full range of values
+final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ByteUtils.writeUnsignedVarint(i, actual);
+final ByteBuffer expected = simpleImplementation.apply(i);
+assertArrayEquals(expected.array(), actual.array(), 
"Implementations do not match for integer=" + i);
+actual.clear();
+}
+}
+
+@Test
+public void testCorrectnessReadUnsignedVarint() {
+// The old well-known implementation for readUnsignedVarint
+Function simpleImplementation = (ByteBuffer 
buffer) -> {
+int value = 0;
+int i = 0;
+int b;
+while (((b = buffer.get()) & 0x80) != 0) {
+value |= (b & 0x7f) << i;
+i += 7;
+if (i > 28)
+throw new IllegalArgumentException("Invalid varint");
+}
+value |= b << i;
+return value;
+};
+
+// compare the full range of values
+final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ByteUtils.writeUnsignedVarint(i, testData);
+// prepare buffer for reading
+testData.flip();
+final int actual = 
ByteUtils.readUnsignedVarint(testData.duplicate());
+final int expected = simpleImplementation.apply(testData);
+assertEquals(expected, actual);
+testData.clear();
+}
+}
+
+@Test
+public void testCorrectnessReadUnsignedVarlong() {

Review Comment:
   done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820116


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   Done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14502) Implement LeaveGroup API

2023-05-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-14502:
---

Assignee: Jeff Kim

> Implement LeaveGroup API
> 
>
> Key: KAFKA-14502
> URL: https://issues.apache.org/jira/browse/KAFKA-14502
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jeff Kim
>Priority: Major
>
> Implement LeaveGroup API in the new Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14501) Implement Heartbeat API

2023-05-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-14501:
---

Assignee: Jeff Kim

> Implement Heartbeat API
> ---
>
> Key: KAFKA-14501
> URL: https://issues.apache.org/jira/browse/KAFKA-14501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jeff Kim
>Priority: Major
>
> Implement Heartbeat API in the new Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14515) Implement uniform broker side assignor

2023-05-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-14515:
---

Assignee: David Jacot

> Implement uniform broker side assignor
> --
>
> Key: KAFKA-14515
> URL: https://issues.apache.org/jira/browse/KAFKA-14515
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14514) Implement range broker side assignor

2023-05-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-14514:
---

Assignee: David Jacot

> Implement range broker side assignor
> 
>
> Key: KAFKA-14514
> URL: https://issues.apache.org/jira/browse/KAFKA-14514
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-05-05 Thread via GitHub


divijvaidya commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1535884087

   @hudeqi thanks for reminding me about this. I will review it next week. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-05-05 Thread via GitHub


divijvaidya commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1535880466

   @bmscomp this is a nice PR and I support it. I opened the other one ( 
https://github.com/apache/kafka/pull/13673 )  for minor versions upgrades 
because minor versions have a lower risk and require less detailed review for 
reviewers. My ideal situation would be if we can merge the minor version 
updates into the upcoming 3.5 (it's safe because of minor upgrades) and then 
target key major version upgrades of dependencies such as this PR for 3.6. 
Nevertheless, let's hear back from what other folks in the community have to 
say about this.
   
   (If the minor version PR doesn't make it to 3.5, I will remove jackson 
changes from it so that jackson can be addressed by this PR).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14963) Incorrect partition count metrics for kraft controllers

2023-05-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14963.
---
Resolution: Fixed

> Incorrect partition count metrics for kraft controllers
> ---
>
> Key: KAFKA-14963
> URL: https://issues.apache.org/jira/browse/KAFKA-14963
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Affects Versions: 3.4.0
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.4.1
>
>
> It is possible for the KRaft controller to report more partitions than are 
> available in the cluster. This is because the following test fail against 
> 3.4.0:
> {code:java}
>        @Test
>       public void testPartitionCountDecreased() {
>           ControllerMetrics metrics = new MockControllerMetrics();
>           ControllerMetricsManager manager = new 
> ControllerMetricsManager(metrics);          Uuid createTopicId = 
> Uuid.randomUuid();
>           Uuid createPartitionTopicId = new Uuid(
>               createTopicId.getMostSignificantBits(),
>               createTopicId.getLeastSignificantBits()
>           );
>           Uuid removeTopicId = new 
> Uuid(createTopicId.getMostSignificantBits(), 
> createTopicId.getLeastSignificantBits());
>           manager.replay(topicRecord("test", createTopicId));
>           manager.replay(partitionRecord(createPartitionTopicId, 0, 0, 
> Arrays.asList(0, 1, 2)));
>           manager.replay(partitionRecord(createPartitionTopicId, 1, 0, 
> Arrays.asList(0, 1, 2)));
>           manager.replay(removeTopicRecord(removeTopicId));
>           assertEquals(0, metrics.globalPartitionCount());
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14805) KRaft Controller shouldn't allow metadata updates before migration starts

2023-05-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14805:
--
Fix Version/s: (was: 3.4.1)

> KRaft Controller shouldn't allow metadata updates before migration starts
> -
>
> Key: KAFKA-14805
> URL: https://issues.apache.org/jira/browse/KAFKA-14805
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.5.0
>
>
> When starting a ZK to KRaft migration, the new KRaft quorum should not accept 
> external metadata updates from things like CreateTopics or 
> AllocateProducerIds. Having metadata present in the log prior to the 
> migration can lead to undefined state, which is not great.
> This is currently causing test failures on trunk because some producer is 
> allocating a producer ID between the time the KRaft quorum starts and the 
> migration starts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-05-05 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1185779424


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
 coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
 assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
 }
 
+@ParameterizedTest
+@ValueSource(booleans = { true, false })
+public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+// Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+// Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+// Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+// Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+// timer never moves forward once the network client is invoked. If 
there is no available
+// response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+// enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+// queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+int offsetCommitCalls = 5;
+long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+IntStream.range(0, offsetCommitCalls).forEach(__ ->
+prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+// UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+// the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+// we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+// offset commit failed is returned.
+
+Map offsets = singletonMap(
+t1p,
+new OffsetAndMetadata(100L, "metadata")
+);
+
+if (commitSync) {
+assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+} else {
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+assertSame(inputOffsets, offsets);
+assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+callbackInvoked.set(true);
+});
+
+coordinator.invokeCompletedOffsetCommitCallbacks();
+assertTrue(callbackInvoked.get());
+}
+}
+
+@ParameterizedTest
+@ValueSource(booleans = { true, false })
+public void testRetryCommitUnknownTopicId(boolean commitSync) {
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+Map offsets = singletonMap(
+t1p,
+new OffsetAndMetadata(100L, "metadata")
+);
+
+if (commitSync) {
+assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+} else {
+AtomicBoolean callbackInvoked = new AtomicBoolean();
+coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+// Unlike the commit offset sync API, the async API does not 
retry.

Review Comment:
   That is right. Christo, maybe you can create two separate tests for these 
cases and factor in common code in a method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-05 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185752497


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-05 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185749017


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+

[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-05 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185732188


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-05 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-05 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-05 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+