[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185752497


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertE

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185749017


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertE

[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185732188


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-05-04 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1535663733

   There are indeed corresponding recovery methods for this issue, such as 
directly deleting the future log on the disk and restarting the corresponding 
broker. Therefore, if the failed source partition does not resume deletion 
immediately when marked as failed, is it necessary to add a monitoring metric 
to this situation for timely detection? @divijvaidya @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-04 Thread via GitHub


showuon commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185666434


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   I agree. So maybe we can `disable` them for now, and add comment for future 
use like what we did 
[here](https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala#L1410).
 After all, the current varLong test is just running the same logic and verify 
they have the same results, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185665409


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] showuon commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc

2023-05-04 Thread via GitHub


showuon commented on code in PR #13660:
URL: https://github.com/apache/kafka/pull/13660#discussion_r1185661328


##
docs/security.html:
##
@@ -2089,6 +2089,144 @@ 

[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController

2023-05-04 Thread via GitHub


hudeqi commented on code in PR #13617:
URL: https://github.com/apache/kafka/pull/13617#discussion_r1185655736


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule {
 /**
  * Tracks if a snapshot generate was scheduled.
  */
-private boolean generateSnapshotScheduled = false;
+private final boolean generateSnapshotScheduled;

Review Comment:
   It seems that this variable has been deleted in # 13540 and can be directly 
ignored...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController

2023-05-04 Thread via GitHub


hudeqi commented on code in PR #13617:
URL: https://github.com/apache/kafka/pull/13617#discussion_r1185653614


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule {
 /**
  * Tracks if a snapshot generate was scheduled.
  */
-private boolean generateSnapshotScheduled = false;
+private final boolean generateSnapshotScheduled;

Review Comment:
   > If the value is always set to `false`, why have the variable at all? 🤔
   
   Hi, I also want to directly delete this variable because it has not been 
used anywhere, but I am not sure if this variable will be used in subsequent 
iteration development. @kirktrue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController

2023-05-04 Thread via GitHub


hudeqi commented on code in PR #13617:
URL: https://github.com/apache/kafka/pull/13617#discussion_r1185653614


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule {
 /**
  * Tracks if a snapshot generate was scheduled.
  */
-private boolean generateSnapshotScheduled = false;
+private final boolean generateSnapshotScheduled;

Review Comment:
   > If the value is always set to `false`, why have the variable at all? 🤔
   
   Hi, I also want to directly delete this variable because it has not been 
used anywhere, but I am not sure if this variable will be used in subsequent 
iteration development.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14914) binarySearch in AbstactIndex may execute with infinite loop

2023-05-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719585#comment-17719585
 ] 

Luke Chen commented on KAFKA-14914:
---

[~flashmouse] , thanks for reporting this issue. Does the offset index still 
exist? Could you upload these indexes for investigation?

> binarySearch in AbstactIndex may execute with infinite loop
> ---
>
> Key: KAFKA-14914
> URL: https://issues.apache.org/jira/browse/KAFKA-14914
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0
>Reporter: li xiangyuan
>Priority: Major
> Attachments: stack.1.txt, stack.2.txt, stack.3.txt
>
>
> Recently our servers in production environment may suddenly stop handle 
> request frequently(for now 3 times in less than 10 days),   please check the 
> stack file uploaded, it show that 1 
> ioThread(data-plane-kafka-request-handler-11) hold  the ReadLock of 
> Partition's leaderIsrUpdateLock and keep run the binarySearch function, once 
> any thread(kafka-scheduler-2) need WriteMode Of this lock, then all requests 
> read this partition need ReadMode Lock will use out all ioThreads and then 
> this broker couldn't handle any request.
> the 3 stack files are fetched with interval  about 6 minute, with my 
> standpoint i just could think obviously the  binarySearch function cause dead 
> lock and I presuppose maybe the index block values in offsetIndex (at least 
> in mmap) are not sorted.
>  
> detail information:
> this problem appear in 2 brokers
> broker version: 2.4.0
> jvm: openjdk 11
> hardware: aws c7g 4xlarge, this is a arm64 server, we recently upgrade our 
> servers from c6g 4xlarge to this type, when we use c6g haven't meet this 
> problem, we don't know whether arm or aws c7g server have any problem.
> other: once we restart broker, it will recover, so we doubt offset index file 
> may not corrupted and maybe something wrong with mmap.
> plz give any suggestion solve this problem, thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-04 Thread via GitHub


clayburn commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1185635216


##
build.gradle:
##
@@ -39,7 +39,6 @@ plugins {
   id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8"
 
   id "com.github.spotbugs" version '5.0.13' apply false
-  id 'org.gradle.test-retry' version '1.5.2' apply false

Review Comment:
   That's what this PR proposes, publishing build scans to the Gradle 
Enterprise instance at https://ge.apache.org. You can see the configuration in 
the `settings.gradle` portion of the PR.
   
   Because the Gradle Enterprise Gradle Plugin bundles in the test-retry 
plugin, it is necessary to remove these lines. Otherwise test-retry will be 
loaded twice and fail. The retry functionality remains the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-04 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185622586


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)

Review Comment:
   We have a guarantee. Inside `do_stop_start_bounce` we wait until the 
processor prints that it has processed records.
   
   Not sure if we want to tighten this check, but I think it should be ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-04 Thread via GitHub


bmscomp commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1185622030


##
build.gradle:
##
@@ -39,7 +39,6 @@ plugins {
   id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8"
 
   id "com.github.spotbugs" version '5.0.13' apply false
-  id 'org.gradle.test-retry' version '1.5.2' apply false

Review Comment:
   @clayburn  is apache Kafka using Gradle entreprise ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-04 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185621879


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)

Review Comment:
   Yes, `[n:m]` returns a sub-string (`from_version` is a string), from index 
`n` to `m` (`m` exclusive). If you omit `n` it's from beginning, if you omit 
`m` it's to the end.
   
   A negative index is "from backwards", so here we get "from beginning" to 
"second last", ie, we cut of the last to chars, ie, the `.x` bug-fix suffix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-04 Thread via GitHub


mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185618918


##
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##
@@ -106,7 +106,11 @@ private static class ValueList {
 }
 
 int next() {
-return (index < values.length) ? values[index++] : -1;
+final int v = values[index++];
+if (index >= values.length) {
+index = 0;
+}

Review Comment:
   Seems the comment is outdated. The custom TS-extractor was removed years 
ago: 
https://github.com/apache/kafka/commit/52e397962b624f3c881b6f99e71c94da32cf6a33
   
   Let me delete the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-05-04 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719572#comment-17719572
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13891:


Thanks [~pnee] – just to round out the above, the actual fix was in 
[pull/13550|https://github.com/apache/kafka/pull/13550]

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0, 3.4.1
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


bmscomp commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1535547766

   It's a good think to keep dependencies up to date 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance

2023-05-04 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719570#comment-17719570
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14016:


Nice job [~pnee] – thanks for taking the time to patch this finally. To 
clarify, since there are a lot of tickets and PRs floating around here, the 
assignor fix is in [pull/13550|https://github.com/apache/kafka/pull/13550]

> Revoke more partitions than expected in Cooperative rebalance
> -
>
> Key: KAFKA-14016
> URL: https://issues.apache.org/jira/browse/KAFKA-14016
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-rebalance-should-fix
> Fix For: 3.5.0, 3.4.1
>
> Attachments: CooperativeStickyAssignorBugReproduction.java
>
>
> In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some 
> consumer didn't reset generation and state after sync group fail with 
> REABALANCE_IN_PROGRESS error.
> So we fixed it by reset generationId (no memberId) when  sync group fail with 
> REABALANCE_IN_PROGRESS error.
> But this change missed the reset part, so another change made in 
> https://issues.apache.org/jira/browse/KAFKA-13891 make this works.
> After apply this change, we found that: sometimes consumer will revoker 
> almost 2/3 of the partitions with cooperative enabled. Because if a consumer 
> did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in 
> syncGroup and revoked their partition before re-jion. example:
>  # consumer A1-A10 (ten consumers) joined and synced group successfully with 
> generation 1 
>  # New consumer B1 joined and start a rebalance
>  # all consumer joined successfully and then A1 need to revoke partition to 
> transfer to B1
>  # A1 do a very quick syncGroup and re-join, because it revoked partition
>  # A2-A10 didn't send syncGroup before A1 re-join, so after the send 
> syncGruop, will get REBALANCE_IN_PROGRESS
>  # A2-A10 will revoke there partitions and re-join
> So in this rebalance almost every partition revoked, which highly decrease 
> the benefit of Cooperative rebalance 
> I think instead of "{*}resetStateAndRejoin{*} when 
> *RebalanceInProgressException* errors happend in {*}sync group{*}" we need 
> another way to fix it.
> Here is my proposal:
>  # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891
>  # In Server Coordinator handleSyncGroup when generationId checked and group 
> state is PreparingRebalance. We can send the assignment along with the error 
> code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the 
> generation first )
>  # When get the REBALANCE_IN_PROGRESS error in client, try to apply the 
> assignment first and then set the rejoinNeeded = true to make it re-join 
> immediately 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185605065


##
metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CapturingTopicMigrationClient implements TopicMigrationClient {
+public List deletedTopics = new ArrayList<>();
+public List createdTopics = new ArrayList<>();
+public LinkedHashMap> updatedTopicPartitions = new 
LinkedHashMap<>();
+
+public void reset() {
+createdTopics.clear();
+updatedTopicPartitions.clear();
+deletedTopics.clear();
+}
+
+
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+

Review Comment:
   OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185604698


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasDelta;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+private final MigrationClient migrationClient;
+private final BiConsumer 
operationConsumer;
+
+public KRaftMigrationZkWriter(
+MigrationClient migrationClient,
+BiConsumer  operationConsumer
+) {
+this.migrationClient = migrationClient;
+this.operationConsumer = operationConsumer;
+}
+
+public void handleSnapshot(MetadataImage image) {
+handleTopicsSnapshot(image.topics());
+handleConfigsSnapshot(image.configs());
+handleClientQuotasSnapshot(image.clientQuotas());
+operationConsumer.accept("Setting next producer ID", migrationState ->
+
migrationClient.writeProducerId(image.producerIds().highestSeenProducerId(), 
migrationState));
+handleAclsSnapshot(image.acls());
+}
+
+public void handleDelta(MetadataImage previousImage, MetadataImage image, 
MetadataDelta delta) {

Review Comment:
   as per the above discussion, this is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


cmccabe commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185604580


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasDelta;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+private final MigrationClient migrationClient;
+private final BiConsumer 
operationConsumer;
+
+public KRaftMigrationZkWriter(
+MigrationClient migrationClient,
+BiConsumer  operationConsumer
+) {
+this.migrationClient = migrationClient;
+this.operationConsumer = operationConsumer;
+}
+
+public void handleSnapshot(MetadataImage image) {

Review Comment:
   hmm, I guess that's true. we don't need it since the brokers shouldn't be 
reading (as opposed to writing) there...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


mumrah commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185602073


##
metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CapturingTopicMigrationClient implements TopicMigrationClient {
+public List deletedTopics = new ArrayList<>();
+public List createdTopics = new ArrayList<>();
+public LinkedHashMap> updatedTopicPartitions = new 
LinkedHashMap<>();
+
+public void reset() {
+createdTopics.clear();
+updatedTopicPartitions.clear();
+deletedTopics.clear();
+}
+
+
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+

Review Comment:
   Not all of the usages implement iterateTopics. For example, tests that just 
exercise the KRaft delta -> ZK path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


mumrah commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185600997


##
core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, 
VersionedAcls}
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
+import kafka.utils.Logging
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.{KafkaZkClient, ResourceZNode, ZkAclStore, ZkVersion}
+import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
+import org.apache.kafka.common.acl.AccessControlEntry
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.metadata.migration.{AclMigrationClient, 
MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.Code
+
+import java.util
+import java.util.function.BiConsumer
+import scala.jdk.CollectionConverters._
+
+class ZkAclMigrationClient(
+  zkClient: KafkaZkClient
+) extends AclMigrationClient with Logging {
+
+  private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): 
CreateRequest = {
+// ZK broker needs the ACL change notification znode to be updated in 
order to process the new ACLs
+val aclChange = 
ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern)
+CreateRequest(aclChange.path, aclChange.bytes, 
zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+  }
+
+  private def tryWriteAcls(
+resourcePattern: ResourcePattern,
+aclEntries: Set[AclEntry],
+create: Boolean,
+state: ZkMigrationLeadershipState
+  ): Option[ZkMigrationLeadershipState] = wrapZkException {
+val aclData = ResourceZNode.encode(aclEntries)
+
+val request = if (create) {
+  val path = ResourceZNode.path(resourcePattern)
+  CreateRequest(path, aclData, zkClient.defaultAcls(path), 
CreateMode.PERSISTENT)
+} else {
+  SetDataRequest(ResourceZNode.path(resourcePattern), aclData, 
ZkVersion.MatchAnyVersion)
+}
+
+val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+if (responses.head.resultCode.equals(Code.NONODE)) {
+  // Need to call this method again with create=true
+  None
+} else {
+  // Write the ACL notification outside of a metadata multi-op
+  
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+  Some(state.withMigrationZkVersion(migrationZkVersion))
+}
+  }
+
+  override def writeResourceAcls(
+resourcePattern: ResourcePattern,
+aclsToWrite: util.Collection[AccessControlEntry],
+state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+val acls = aclsToWrite.asScala.map(new AclEntry(_)).toSet
+tryWriteAcls(resourcePattern, acls, create = false, state) match {
+  case Some(newState) => newState
+  case None => tryWriteAcls(resourcePattern, acls, create = true, state) 
match {
+case Some(newState) => newState
+case None => throw new MigrationClientException(s"Could not write ACLs 
for resource pattern $resourcePattern")
+  }
+}
+  }
+
+  override def deleteResource(
+resourcePattern: ResourcePattern,
+state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+val request = DeleteRequest(ResourceZNode.path(resourcePattern), 
ZkVersion.MatchAnyVersion)
+val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+if (responses.head.resultCode.equals(Code.OK) || 
responses.head.resultCode.equals(Code.NONODE)) {
+  // Write the ACL notification outside of a metadata multi-op
+  
zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+  state.withMigrationZkVersion(migrationZkVersion)
+} else {
+  throw new MigrationClientException(s"Could not delete ACL for resource 
pattern $resourcePattern")
+}
+  }
+
+  override def iterateAcls(
+aclConsumer: BiConsumer[ResourcePattern, util

[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration

2023-05-04 Thread via GitHub


mumrah commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1185600867


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasDelta;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+private final MigrationClient migrationClient;
+private final BiConsumer 
operationConsumer;
+
+public KRaftMigrationZkWriter(
+MigrationClient migrationClient,
+BiConsumer  operationConsumer
+) {
+this.migrationClient = migrationClient;
+this.operationConsumer = operationConsumer;
+}
+
+public void handleSnapshot(MetadataImage image) {

Review Comment:
   I thought we decided not to write out the KRaft broker registrations to ZK? 
So far, we have not implemented that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-05-04 Thread via GitHub


bmscomp commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1535524442

   @divijvaidya  this pull request targets an update to version 2.15.0, please 
can you tell me better about updating not only minor versions ? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test

2023-05-04 Thread via GitHub


mjsax commented on PR #13654:
URL: https://github.com/apache/kafka/pull/13654#issuecomment-1535516492

   Ok. Dug into this a little bit, and some test are broken, because KS is 
broken.
   
   The original PR for https://issues.apache.org/jira/browse/KAFKA-13769 broke 
KS, but it also added the test regression using randomized `application.id`s 
and thus masked the error it introduced. The issue was reported later via 
https://issues.apache.org/jira/browse/KAFKA-14646 and we fixed it forward.
   
   However, the fact remains that rolling bounce upgrades are broken for 2.4 - 
3.0 release as "from_version". I updates this PR to disable the corresponding 
tests.
   
   Retriggered system tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5658/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-04 Thread via GitHub


junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1184289360


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1160,48 +1172,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {
+val key = new 
TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), 
remoteFetchInfo.get.topicPartition.partition())
+val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+var remoteFetchTask: Future[Void] = null
+try {
+  remoteFetchTask = 
remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: 
RemoteLogReadResult) => {
+remoteFetchResult.complete(result)
+delayedRemoteFetchPurgatory.checkAndComplete(key)
+  })
+} catch {
+  // if the task queue of remote storage reader thread pool is full, 
return what we currently have
+  // (the data read from local log segment for the other 
topic-partitions) and an error for the topic-partition that
+  // we couldn't read from remote storage
+  case e: RejectedExecutionException =>
+val fetchPartitionData = logReadResults.map { case (tp, result) =>
+  val r = {
+if 
(tp.topicPartition().equals(remoteFetchInfo.get.topicPartition))
+  createLogReadResult(e)
+else
+  result
+  }
+
+  tp -> r.toFetchPartitionData(false)
+}
+responseCallback(fetchPartitionData)
+return
+}
+
+// If there is remote data, we will read remote data, instead of 
waiting for new data.
+val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo.get,
+  fetchPartitionStatus, params, logReadResults, this, responseCallback)
+
+delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+  } else {
+// If there is not enough data to respond and there is no remote data, 
we will let the fetch request
+// to wait for new data.

Review Comment:
   let the fetch request to wait => let the fetch request wait 



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig,
 result
   }
 
+  private def createLogReadResult(highWatermark: Long,

Review Comment:
   Should this be in `object ReplicaManager`?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig,
 result
   }
 
+  private def createLogReadResult(highWatermark: Long,
+  leaderLogStartOffset: Long,
+  leaderLogEndOffset: Long,
+  e: Throwable) = {
+LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+  divergingEpoch = None,
+  highWatermark,
+  leaderLogStartOffset,
+  leaderLogEndOffset,
+  followerLogStartOffset = -1L,
+  fetchTimeMs = -1L,
+  lastStableOffset = None,
+  exception = Some(e))
+  }
+
+  def createLogReadResult(e: Throwable): LogReadResult = {

Review Comment:
   Should this be in `object ReplicaManager`?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch

[GitHub] [kafka] sfc-gh-japatel commented on pull request #12647: KAFKA-14191: Add end-to-end latency metrics to Connectors

2023-05-04 Thread via GitHub


sfc-gh-japatel commented on PR #12647:
URL: https://github.com/apache/kafka/pull/12647#issuecomment-1535462677

   Hi team, any progress/plans on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-04 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1185544285


##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -3273,17 +3274,46 @@ class PartitionTest extends AbstractPartitionTest {
   baseOffset = 0L,
   producerId = producerId)
 partition.appendRecordsToLeader(idempotentRecords, origin = 
AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
-assertFalse(partition.hasOngoingTransaction(producerId))
+assertEquals(OptionalLong.empty(), txnFirstOffset(producerId))

Review Comment:
   will fix duplicate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-04 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1185533261


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1090,6 +1090,9 @@ private void maybeFailWithError() {
 } else if (lastError instanceof InvalidProducerEpochException) {
 throw new InvalidProducerEpochException("Producer with 
transactionalId '" + transactionalId
 + "' and " + producerIdAndEpoch + " attempted to produce 
with an old epoch");
+} else if (lastError instanceof IllegalStateException) {
+throw new IllegalStateException("Producer with transactionalId 
'" + transactionalId

Review Comment:
   Message now says:
   
   ```
   Producer with transactionalId '' and  cannot execute transactional
   method because of previous invalid state transition attempt
   ```
   
   The `IllegalStateException` thrown from `maybeFailWithError` includes 
`lastError` as its `cause`. The stack trace would include the message for 
`lastError` which reads:
   
   ```
   TransactionalId : Invalid transition attempted from state 
   to state 
   ```
   
   Here's what it looks like from one of the unit tests:
   
   ```
   Producer with transactionalId 'foobar' and 
ProducerIdAndEpoch(producerId=13131, epoch=1) cannot execute transactional 
method because of previous invalid state transition attempt
   java.lang.IllegalStateException: Producer with transactionalId 'foobar' and 
ProducerIdAndEpoch(producerId=13131, epoch=1) cannot execute transactional 
method because of previous invalid state transition attempt
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:324)
at 
org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBackgroundInvalidStateTransitionIsFatal(TransactionManagerTest.java:3422)
. . .
   Caused by: java.lang.IllegalStateException: TransactionalId foobar: Invalid 
transition attempted from state READY to state ABORTABLE_ERROR
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1070)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:467)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:707)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:714)
at 
org.apache.kafka.clients.producer.internals.TransactionManagerTest.lambda$testBackgroundInvalidStateTransitionIsFatal$139(TransactionManagerTest.java:3418)
at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3083)
at 
org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBackgroundInvalidStateTransitionIsFatal(TransactionManagerTest.java:3418)
... 84 more
   ```
   
   Does that seem like sufficient context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-04 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1185525137


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {
+doInitTransactions();
+assertTrue(transactionManager.isTransactional());
+
+// Step 1: create a transaction.
+transactionManager.beginTransaction();
+assertTrue(transactionManager.hasOngoingTransaction());
+
+// Step 2: abort a transaction (wait for it to complete) and then 
verify that the transaction manager is
+// left in the READY state.
+TransactionalRequestResult abortResult = 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND);
+runUntil(abortResult::isCompleted);
+abortResult.await();
+assertTrue(abortResult.isSuccessful());
+assertFalse(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.isReady());
+
+// Step 3: create a batch and simulate the Sender handling a failed 
batch, which would *attempt* to put
+// the transaction manager in the ABORTABLE_ERROR state. However, that 
is an illegal state transition, so
+// verify that it failed and caused the transaction manager to be put 
in an unrecoverable FATAL_ERROR state.
+ProducerBatch batch = batchWithValue(tp0, "test");
+assertThrowsFatalStateException("handleFailedBatch", () -> 
transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected 
from node 4"), false));
+assertTrue(transactionManager.hasFatalError());
+
+// Step 4: validate that the transactions can't be started, committed
+assertThrowsFatalStateException("beginTransaction", () -> 
transactionManager.beginTransaction());
+assertThrowsFatalStateException("beginAbort", () -> 
transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND));
+assertThrowsFatalStateException("beginCommit", () -> 
transactionManager.beginCommit());
+assertThrowsFatalStateException("maybeAddPartition", () -> 
transactionManager.maybeAddPartition(tp0));
+assertThrowsFatalStateException("initializeTransactions", () -> 
transactionManager.initializeTransactions());
+assertThrowsFatalStateException("sendOffsetsToTransaction", () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
+}
+
+private void assertThrowsFatalStateException(String methodName, Runnable 
operation) {
+try {
+operation.run();
+} catch (KafkaException t) {

Review Comment:
   I added a check for `IllegalStateException` in `maybeFailWithError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-04 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1185524662


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
 }
 
+@Test
+public void testMakeIllegalTransitionFatal() {

Review Comment:
   Added requested tests for foreground errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-05-04 Thread via GitHub


kirktrue commented on PR #13640:
URL: https://github.com/apache/kafka/pull/13640#issuecomment-1535394647

   > With this PR I consistently get failures with the following tests
   
   Thanks for catching that. I was not setting the client ID to the correct 
value for the `KafkaAdminClient`'s `NetworkClient`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-04 Thread via GitHub


clayburn commented on code in PR #13676:
URL: https://github.com/apache/kafka/pull/13676#discussion_r1185496911


##
build.gradle:
##
@@ -39,7 +39,6 @@ plugins {
   id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8"
 
   id "com.github.spotbugs" version '5.0.13' apply false
-  id 'org.gradle.test-retry' version '1.5.2' apply false

Review Comment:
   The test-retry plugin is bundled with the Gradle Enterprise plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clayburn opened a new pull request, #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-05-04 Thread via GitHub


clayburn opened a new pull request, #13676:
URL: https://github.com/apache/kafka/pull/13676

   This PR publishes a build scan for every CI build on Jenkins and GitHub 
Actions and for every local build from an authenticated Apache committer. The 
build will not fail if publishing fails.
   
   The build scans of the Apache Kafka project are published to the Gradle 
Enterprise instance at [ge.apache.org](https://ge.apache.org/), hosted by the 
Apache Software Foundation and run in partnership between the ASF and Gradle. 
This Gradle Enterprise instance has all features and extensions enabled and is 
freely available for use by the Apache Kafka project and all other Apache 
projects.
   
   This pull request enhances the functionality of publishing build scans to 
the publicly available [scans.gradle.com](https://scans.gradle.com/) by instead 
publishing build scans to [ge.apache.org](https://ge.apache.org/). On this 
Gradle Enterprise instance, Apache Kafka will have access not only to all of 
the published build scans but other aggregate data features such as:
   
   - Dashboards to view all historical build scans, along with performance 
trends over time
   - Build failure analytics for enhanced investigation and diagnosis of build 
failures
   - Test failure analytics to better understand trends and causes around slow, 
failing, and flaky tests
   
   If interested in exploring a fully populated Gradle Enterprise instance, 
please explore the builds already connected to 
[ge.apache.org](https://ge.apache.org/), the [Spring project’s 
instance](https://ge.spring.io/scans?search.relativeStartTime=P28D&search.timeZoneId=Europe/Zurich),
 or any number of other [OSS 
projects](https://gradle.com/enterprise-customers/oss-projects/) for which we 
sponsor instances of Gradle Enterprise.
   
   Please let me know if there are any questions about the value of Gradle 
Enterprise or the changes in this pull request and I’d be happy to address them.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185475014


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+ass

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185460028


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java:
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CurrentAssignmentBuilderTest {
+
+@Test
+public void testTransitionFromNewTargetToRevoke() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 3, 4, 5),
+mkTopicAssignment(topicId2, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.REVOKING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+assertEquals(10, updatedMember.memberEpoch());
+assertEquals(11, updatedMember.nextMemberEpoch());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 3),
+mkTopicAssignment(topicId2, 6)
+), updatedMember.assignedPartitions());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2),
+mkTopicAssignment(topicId2, 4, 5)
+), updatedMember.partitionsPendingRevocation());
+assertEquals(mkAssignment(
+mkTopicAssignment(topicId1, 4, 5),
+mkTopicAssignment(topicId2, 7, 8)
+), updatedMember.partitionsPendingAssignment());
+}
+
+@Test
+public void testTransitionFromNewTargetToAssigning() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.setNextMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3),
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state());
+
+Assignment targetAssignment = new Assignment(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3, 4, 5),
+mkTopicAssignment(topicId2, 4, 5, 6, 7, 8)
+));
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(11, targetAssignment)
+.withCurrentPartitionEpoch((topicId, partitionId) -> 10)
+.build();
+
+assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, 
updatedMember.state());
+assertEquals(10, updatedMember.previousMemberEpoch());
+ass

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185446129


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).
+ */
+public class GenericGroupMember {
+
+/**
+ * A builder allowing to create a new generic member or update an
+ * existing one.
+ *
+ * Please refer to the javadoc of {{@link GenericGroupMember}} for the
+ * definition of the fields.
+ */
+public static class Builder {
+private final String memberId;
+private Optional groupInstanceId = Optional.empty();
+private String clientId = "";
+private String clientHost = "";
+private int rebalanceTimeoutMs = -1;
+private int sessionTimeoutMs = -1;
+private String protocolType = "";
+private List supportedProtocols = Collections.emptyList();
+private byte[] assignment = new byte[0];
+
+public Builder(String memberId) {
+this.memberId = Objects.requireNonNull(memberId);
+}
+
+public Builder(GenericGroupMember member) {
+Objects.requireNonNull(member);
+
+this.memberId = member.memberId;
+this.groupInstanceId = member.groupInstanceId;
+this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+this.sessionTimeoutMs = member.sessionTimeoutMs;
+this.clientId = member.clientId;
+this.clientHost = member.clientHost;
+this.protocolType = member.protocolType;
+this.supportedProtocols = member.supportedProtocols;
+this.assignment = member.assignment;
+}
+
+public Builder setGroupInstanceId(Optional groupInstanceId) {
+this.groupInstanceId = groupInstanceId;
+return this;
+}
+
+public Builder setClientId(String clientId) {
+this.clientId = clientId;
+return this;
+}
+
+public Builder setClientHost(String clientHost) {
+this.clientHost = clientHost;
+return this;
+}
+
+public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+return this;
+}
+
+public Builder setSessionTimeoutMs(int sessionTimeoutMs) {
+this.sessionTimeoutMs = sessionTimeoutMs;
+return this;
+}
+
+public Builder setProtocolType(String protocolType) {
+this.protocolType = protocolType;
+return this;
+}
+
+public Builder setSupportedProtocols(List protocols) {
+this.supportedProtocols = protocols;
+return this;
+}
+
+public Builder setAssignment(byte[] assignment) {
+this.assignment = assignment;
+return this;
+}
+
+public GenericGroupMember build() {
+return new GenericGroupMember(
+memberId,
+groupInstanceId,
+clientId,
+clientHost,
+rebalanceTimeoutMs,
+sessionTimeoutMs,
+protocolType,
+supportedProtocols,
+assignment
+);
+}
+}
+
+private static class MemberSummary {

Review Comment:
   got it. i'll remove it in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185444113


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance 
state,
+ * its rebalance callback will be kept in the 
metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, 
its sync callback
+ *is kept in metadata until the leader provides 
the group assignment
+ *and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+private static class MemberSummary {
+private final String memberId;
+private final Optional groupInstanceId;
+private final String clientId;
+private final String clientHost;
+private final byte[] metadata;
+private final byte[] assignment;
+
+public MemberSummary(String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String clientHost,
+ byte[] metadata,
+ byte[] assignment) {
+
+this.memberId = memberId;
+this.groupInstanceId = groupInstanceId;
+this.clientId = clientId;
+this.clientHost = clientHost;
+this.metadata = metadata;
+this.assignment = assignment;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Optional getGroupInstanceId() {
+return groupInstanceId;
+}
+
+public String clientId() {
+return clientId;
+}
+
+public String clientHost() {
+return clientHost;
+}
+
+public byte[] metadata() {
+return metadata;
+}
+
+public byte[] assignment() {
+return assignment;
+}
+
+}
+
+/**
+ * The member id.
+ */
+private final String memberId;
+
+/**
+ * The group instance id.
+ */
+private final Optional groupInstanceId;
+
+/**
+ * The client id.
+ */
+private final String clientId;
+
+/**
+ * The client host.
+ */
+private final String clientHost;
+
+/**
+ * The rebalance timeout in milliseconds.
+ */
+private final int rebalanceTimeoutMs;
+
+/**
+ * The session timeout in milliseconds.
+ */
+private final int sessionTimeoutMs;
+
+/**
+ * The protocol type.
+ */
+private final String protocolType;
+
+/**
+ * The list of supported protocols.
+ */
+private final List supportedProtocols;
+
+/**
+ * The assignment stored by the client assignor.
+ */
+private final byte[] assignment;
+
+/**
+ * The callback that is invoked once this member joins the group.
+ */
+private CompletableFuture awaitingJoinCallback = 
null;
+
+/**
+ * The callback that is invoked once this member completes the sync group 
phase.
+ */
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185441081


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance 
state,
+ * its rebalance callback will be kept in the 
metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, 
its sync callback
+ *is kept in metadata until the leader provides 
the group assignment
+ *and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+private static class MemberSummary {
+private final String memberId;
+private final Optional groupInstanceId;
+private final String clientId;
+private final String clientHost;
+private final byte[] metadata;
+private final byte[] assignment;
+
+public MemberSummary(String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String clientHost,
+ byte[] metadata,
+ byte[] assignment) {
+
+this.memberId = memberId;
+this.groupInstanceId = groupInstanceId;
+this.clientId = clientId;
+this.clientHost = clientHost;
+this.metadata = metadata;
+this.assignment = assignment;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Optional getGroupInstanceId() {
+return groupInstanceId;
+}
+
+public String clientId() {
+return clientId;
+}
+
+public String clientHost() {
+return clientHost;
+}
+
+public byte[] metadata() {
+return metadata;
+}
+
+public byte[] assignment() {
+return assignment;
+}
+
+}
+
+/**
+ * The member id.
+ */
+private final String memberId;
+
+/**
+ * The group instance id.
+ */
+private final Optional groupInstanceId;
+
+/**
+ * The client id.
+ */
+private final String clientId;
+
+/**
+ * The client host.
+ */
+private final String clientHost;
+
+/**
+ * The rebalance timeout in milliseconds.
+ */
+private final int rebalanceTimeoutMs;
+
+/**
+ * The session timeout in milliseconds.
+ */
+private final int sessionTimeoutMs;
+
+/**
+ * The protocol type.
+ */
+private final String protocolType;
+
+/**
+ * The list of supported protocols.
+ */
+private final List supportedProtocols;
+
+/**
+ * The assignment stored by the client assignor.
+ */
+private final byte[] assignment;
+
+/**
+ * The callback that is invoked once this member joins the group.
+ */
+private CompletableFuture awaitingJoinCallback = 
null;
+
+/**
+ * The callback that is invoked once this member completes the sync group 
phase.
+ */
+

[GitHub] [kafka] hertzsprung commented on pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult

2023-05-04 Thread via GitHub


hertzsprung commented on PR #13671:
URL: https://github.com/apache/kafka/pull/13671#issuecomment-1535283665

   `:clients:test:` succeeded locally but fails in CI due to a gradle 
configuration problem:
   
   > Could not create task ':clients:test'.
   > 
   > [2023-05-04T16:19:19.121Z]
   > The Gradle Enterprise Gradle plugin is conflicting with the Test Retry 
Gradle plugin and has already added a retry extension to the test task test. 
Please either remove the Test Retry Gradle plugin from this project or disable 
the registration of the retry extension in the Gradle Enterprise Gradle plugin 
by specifying the system property 'gradle.enterprise.testretry.enabled' and 
setting it to 'false'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13540: MINOR: improve QuorumController logging

2023-05-04 Thread via GitHub


cmccabe closed pull request #13540: MINOR: improve QuorumController logging
URL: https://github.com/apache/kafka/pull/13540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1185332905


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance 
state,
+ * its rebalance callback will be kept in the 
metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, 
its sync callback
+ *is kept in metadata until the leader provides 
the group assignment
+ *and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+private static class MemberSummary {
+private final String memberId;
+private final Optional groupInstanceId;
+private final String clientId;
+private final String clientHost;
+private final byte[] metadata;
+private final byte[] assignment;
+
+public MemberSummary(String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String clientHost,
+ byte[] metadata,
+ byte[] assignment) {
+
+this.memberId = memberId;
+this.groupInstanceId = groupInstanceId;
+this.clientId = clientId;
+this.clientHost = clientHost;
+this.metadata = metadata;
+this.assignment = assignment;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Optional getGroupInstanceId() {
+return groupInstanceId;
+}
+
+public String clientId() {
+return clientId;
+}
+
+public String clientHost() {
+return clientHost;
+}
+
+public byte[] metadata() {
+return metadata;
+}
+
+public byte[] assignment() {
+return assignment;
+}
+
+}
+
+/**
+ * The member id.
+ */
+private final String memberId;
+
+/**
+ * The group instance id.
+ */
+private final Optional groupInstanceId;
+
+/**
+ * The client id.
+ */
+private final String clientId;

Review Comment:
   i removed the final keyword and added a setter method in 
https://github.com/apache/kafka/pull/13663/files#diff-7ec3bedcac3e1a182a8aed96e4e40f0b5ac3295c90bbc862799b7b57e07f983dR494-R499
   
   as it's only used by the group metadata. should i refactor it here or keep 
it as is and change it in https://github.com/apache/kafka/pull/13663?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compli

[jira] [Updated] (KAFKA-14958) Investigate enforcing all batches have the same producer ID

2023-05-04 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-14958:
---
Description: 
KAFKA-14916 was created after I incorrectly assumed transaction ID in the 
produce request indicated all batches were transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but we decided this can 
be done in a followup, as we still need to assess if we can enforce this 
without breaking workloads. We did enforce this for partitions we are verifying 
only in KAFKA-14916. 

This ticket is that followup. 

  was:
KAFKA-14916 was created after I incorrectly assumed transaction ID in the 
produce request indicated all batches were transactional.

Originally this ticket had an action item to ensure all the producer IDs are 
the same in the batches since we send a single txn ID, but we decided this can 
be done in a followup, as we still need to assess if we can enforce this 
without breaking workloads.

This ticket is that followup. 


> Investigate enforcing all batches have the same producer ID
> ---
>
> Key: KAFKA-14958
> URL: https://issues.apache.org/jira/browse/KAFKA-14958
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Minor
>
> KAFKA-14916 was created after I incorrectly assumed transaction ID in the 
> produce request indicated all batches were transactional.
> Originally this ticket had an action item to ensure all the producer IDs are 
> the same in the batches since we send a single txn ID, but we decided this 
> can be done in a followup, as we still need to assess if we can enforce this 
> without breaking workloads. We did enforce this for partitions we are 
> verifying only in KAFKA-14916. 
> This ticket is that followup. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional

2023-05-04 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-14916.

Resolution: Fixed

> Fix code that assumes transactional ID implies all records are transactional
> 
>
> Key: KAFKA-14916
> URL: https://issues.apache.org/jira/browse/KAFKA-14916
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> KAFKA-14561 wrote code that assumed that if a transactional ID was included, 
> all record batches were transactional and had the same producer ID.
> This work with improve validation and fix the code that assumes all batches 
> are transactional.
> Further, KAFKA-14561 will not assume all records are transactional.
> Originally this ticket had an action item to ensure all the producer IDs are 
> the same in the batches since we send a single txn ID, but that can be done 
> in a followup KAFKA-14958, as we still need to assess if we can enforce this 
> without breaking workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13668) Failed cluster authorization should not be fatal for producer

2023-05-04 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-13668.

Fix Version/s: 3.6.0
   Resolution: Fixed

> Failed cluster authorization should not be fatal for producer
> -
>
> Key: KAFKA-13668
> URL: https://issues.apache.org/jira/browse/KAFKA-13668
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.6.0
>
>
> The idempotent producer fails fatally if the initial `InitProducerId` returns 
> CLUSTER_AUTHORIZATION_FAILED. This makes the producer unusable until a new 
> instance is constructed. For some applications, it is more convenient to keep 
> the producer instance active and let the administrator fix the permission 
> problem instead of going into a crash loop. Additionally, most applications 
> will probably not be smart enough to reconstruct the producer instance, so if 
> the application does not handle the error by failing, users will have to 
> restart the application manually. 
> I think it would be better to let the producer retry the `InitProducerId` 
> request as long as the user keeps trying to use the producer. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14624) State restoration is broken with standby tasks and cache-enabled stores in processor API

2023-05-04 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719425#comment-17719425
 ] 

Lucas Brutschy commented on KAFKA-14624:


This seems to be a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-14172

A fix will be included in 3.5.

[~balajirrao] can you attempt to reproduce the bug with latest master and 
confirm that it's fixed?

> State restoration is broken with standby tasks and cache-enabled stores in 
> processor API
> 
>
> Key: KAFKA-14624
> URL: https://issues.apache.org/jira/browse/KAFKA-14624
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Balaji Rao
>Priority: Major
>
> I found that cache-enabled state stores in PAPI with standby tasks sometimes 
> returns stale data when a partition moves from one app instance to another 
> and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a 
> small project that I used to reproduce the issue.
> I dug around a bit and it seems like it's a bug in standby task state 
> restoration when caching is enabled. If a partition moves from instance 1 to 
> 2 and then back to instance 1,  since the `CachingKeyValueStore` doesn't 
> register a restore callback, it can return potentially stale data for 
> non-dirty keys. 
> I could fix the issue by modifying the `CachingKeyValueStore` to register a 
> restore callback in which the cache restored keys are added to the cache. Is 
> this fix in the right direction?
> {code:java}
> // register the store
> context.register(
> root,
> (RecordBatchingStateRestoreCallback) records -> {
> for (final ConsumerRecord record : 
> records) {
> put(Bytes.wrap(record.key()), record.value());
> }
> }
> );
> {code}
>  
> I would like to contribute a fix, if I can get some help!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-05-04 Thread via GitHub


jolshan merged PR #13607:
URL: https://github.com/apache/kafka/pull/13607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-04 Thread via GitHub


Hangleton commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1535077333

   Agree that this change should be ideally confined to the Windows platform.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-04 Thread via GitHub


cmccabe commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1535052942

   I'm going to leave a -1 here because there are a number of issues in the 
patch and it's not ready to commit yet. I would encourage you to split this 
patch into several parts and clearly explain what issue each one solves and how.
   
   From the high level, the challenge for us is that Windows does things quite 
differently in some cases, but we don't want to degrade performance on all the 
other platforms. So a lot of thought is probably required here. Ideally other 
platforms would not be impacted at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-05-04 Thread via GitHub


jolshan merged PR #12149:
URL: https://github.com/apache/kafka/pull/12149


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185204551


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185195017


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+ * provided consumer group member.
+ *
+ * @param member The consumer group member that must be reconciled.
+ */
+public CurrentAssignmentBui

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185193708


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+ * provided consumer group member.
+ *
+ * @param member The consumer group member that must be reconciled.
+ */
+public CurrentAssignmentBui

[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


dajac commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185188619


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+   

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185172881


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch  - The current epoch of the member.
+ * - Next Epoch - The desired epoch of the member. It corresponds to the 
epoch of
+ *the target/desired assignment. The member transitions to 
this epoch
+ *when it has revoked the partitions that it does not own 
or if it
+ *does not have to revoke any.
+ * - Previous Epoch - The previous epoch of the member when the state was 
updated.
+ * - Assigned Set   - The set of partitions currently assigned to the member. 
This represents what
+ *the member should have.
+ * - Revoking Set   - The set of partitions that the member should revoke 
before it can transition
+ *to the next state.
+ * - Assigning Set  - The set of partitions that the member will eventually 
receive. The partitions
+ *in this set are still owned by other members in the 
group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. 
The state machine starts
+ *   here when the next epoch does not match the 
target epoch. It means that
+ *   a new target assignment has been installed so the 
reconciliation process
+ *   must restart. In this state, the Assigned, 
Revoking and Assigning sets
+ *   are computed. If Revoking is not empty, the state 
machine transitions
+ *   to REVOKE; if Assigning is not empty, it 
transitions to ASSIGNING;
+ *   otherwise it transitions to STABLE.
+ * - REVOKE- This state means that the member must revoke 
partitions before it can
+ *   transition to the next epoch and thus start 
receiving new partitions.
+ *   The member transitions to the next state only 
when it has acknowledged
+ *   the revocation.
+ * - ASSIGNING - This state means that the member waits on 
partitions which are still
+ *   owned by other members in the group. It remains 
in this state until
+ *   they are all freed up.
+ * - STABLE- This state means that the member has received all 
its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder

2023-05-04 Thread via GitHub


jeffkbkim commented on code in PR #13638:
URL: https://github.com/apache/kafka/pull/13638#discussion_r1185168753


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
+ * consumer group protocol. Given the current state of a member and a desired 
or target
+ * assignment state, the state machine takes the necessary steps to converge 
them.
+ *
+ * The member state has the following properties:
+ * - Current Epoch:
+ *   The current epoch of the member.
+ *
+ * - Next Epoch:
+ *   The desired epoch of the member. It corresponds to the epoch of the 
target/desired assignment.
+ *   The member transitions to this epoch when it has revoked the partitions 
that it does not own
+ *   or if it does not have to revoke any.
+ *
+ * - Previous Epoch:
+ *   The previous epoch of the member when the state was updated.
+ *
+ * - Assigned Partitions:
+ *   The set of partitions currently assigned to the member. This represents 
what the member should have.
+ *
+ * - Partitions Pending Revocation:
+ *   The set of partitions that the member should revoke before it can 
transition to the next state.
+ *
+ * - Partitions Pending Assignment:
+ *   The set of partitions that the member will eventually receive. The 
partitions in this set are
+ *   still owned by other members in the group.
+ *
+ * The state machine has four states:
+ * - NEW_TARGET_ASSIGNMENT:
+ *   This is the initial state of the state machine. The state machine starts 
here when the next epoch
+ *   does not match the target epoch. It means that a new target assignment 
has been installed so the
+ *   reconciliation process must restart. In this state, the Assigned, 
Revoking and Assigning sets are
+ *   computed. If Revoking is not empty, the state machine transitions to 
REVOKING; if Assigning is not
+ *   empty, it transitions to ASSIGNING; otherwise it transitions to STABLE.
+ *
+ * - REVOKING:
+ *   This state means that the member must revoke partitions before it can 
transition to the next epoch
+ *   and thus start receiving new partitions. This is to guarantee that 
offsets of revoked partitions
+ *   are committed with the current epoch. The member transitions to the next 
state only when it has
+ *   acknowledged the revocation.
+ *
+ * - ASSIGNING:
+ *   This state means that the member waits on partitions which are still 
owned by other members in the
+ *   group. It remains in this state until they are all freed up.
+ *
+ * - STABLE:
+ *   This state means that the member has received all its assigned partitions.
+ */
+public class CurrentAssignmentBuilder {
+/**
+ * The consumer group member which is reconciled.
+ */
+private final ConsumerGroupMember member;
+
+/**
+ * The target assignment epoch.
+ */
+private int targetAssignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private Assignment targetAssignment;
+
+/**
+ * A function which returns the current epoch of a topic-partition or -1 
if the
+ * topic-partition is not assigned. The current epoch is the epoch of the 
current owner.
+ */
+private BiFunction currentPartitionEpoch;
+
+/**
+ * The partitions owned by the consumer. This is directly provided by the 
member in the
+ * ConsumerGroupHeartbeat request.
+ */
+private List 
ownedTopicPartitions;
+
+/**
+ * Constructs the CurrentAssignmentBuilder based on the current state of 
the
+ * provided consumer group member.
+ *
+ * @param member The consumer group member that must be reconciled.
+ */
+public CurrentAssignmen

[GitHub] [kafka] clolov commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-05-04 Thread via GitHub


clolov commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1185137613


##
core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala:
##
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import kafka.server.KafkaApisTest.{NameAndId, newOffsetCommitRequestData, 
newOffsetCommitResponseData}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
OffsetAndMetadata}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.common.requests.OffsetCommitRequestTest.assertResponseEquals
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.requests.{OffsetCommitRequest, 
OffsetCommitResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+
+import java.util.Optional.empty
+import java.util.Properties
+import scala.collection.{Map, Seq}
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, 
SeqHasAsJava}
+
+class OffsetCommitRequestTest extends BaseRequestTest {
+  override def brokerCount: Int = 1
+
+  val brokerId: Integer = 0
+  val offset = 15L
+  val groupId = "groupId"
+
+  var consumer: KafkaConsumer[_, _] = _
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+  }
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+super.setUp(testInfo)
+consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+consumer = createConsumer()
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+if (consumer != null)
+  Utils.closeQuietly(consumer, "KafkaConsumer")
+super.tearDown()
+  }
+
+  def createTopics(topicNames: String*): Seq[NameAndId] = {
+topicNames.map(topic => {
+  createTopic(topic)
+  val topicId: Uuid = getTopicIds().get(topic) match {
+case Some(x) => x
+case _ => throw new AssertionError("Topic ID not found for " + topic)
+  }
+  NameAndId(topic, topicId)
+})
+  }
+
+
+  @Test
+  def testTopicIdsArePopulatedInOffsetCommitResponses(): Unit = {
+val topics = createTopics("topic1", "topic2", "topic3")

Review Comment:
   Sorry, could you elaborate, because I am not certain I follow? 
`getTopicIds(...)` will return a map but only if the topics requested have been 
created first. Are you suggesting that since all tests create these three 
topics we move the creation to the setup method and then we use `getTopicIds` 
everywhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185104750


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] lucasbru commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-05-04 Thread via GitHub


lucasbru commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1185039728


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   In the state updater code path, we would immediately go back to processing / 
committing the active (previously existing) tasks, so I am not sure the problem 
being solved here may not be relevant in the state updater code. But I may be 
missing something. 
   
   In that case, we should probably only execute the commit if the state 
updater code path is not enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-05-04 Thread via GitHub


dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r118500


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates a generic group member's metadata.
+ *
+ * Member metadata contains the following:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance 
state,
+ * its rebalance callback will be kept in the 
metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, 
its sync callback
+ *is kept in metadata until the leader provides 
the group assignment
+ *and the group transitions to stable
+ */
+public class GenericGroupMember {
+
+private static class MemberSummary {
+private final String memberId;
+private final Optional groupInstanceId;
+private final String clientId;
+private final String clientHost;
+private final byte[] metadata;
+private final byte[] assignment;
+
+public MemberSummary(String memberId,
+ Optional groupInstanceId,
+ String clientId,
+ String clientHost,
+ byte[] metadata,
+ byte[] assignment) {
+
+this.memberId = memberId;
+this.groupInstanceId = groupInstanceId;
+this.clientId = clientId;
+this.clientHost = clientHost;
+this.metadata = metadata;
+this.assignment = assignment;
+}
+
+public String memberId() {
+return memberId;
+}
+
+public Optional getGroupInstanceId() {
+return groupInstanceId;
+}
+
+public String clientId() {
+return clientId;
+}
+
+public String clientHost() {
+return clientHost;
+}
+
+public byte[] metadata() {
+return metadata;
+}
+
+public byte[] assignment() {
+return assignment;
+}
+
+}
+
+/**
+ * The member id.
+ */
+private final String memberId;
+
+/**
+ * The group instance id.
+ */
+private final Optional groupInstanceId;
+
+/**
+ * The client id.
+ */
+private final String clientId;
+
+/**
+ * The client host.
+ */
+private final String clientHost;
+
+/**
+ * The rebalance timeout in milliseconds.
+ */
+private final int rebalanceTimeoutMs;
+
+/**
+ * The session timeout in milliseconds.
+ */
+private final int sessionTimeoutMs;
+
+/**
+ * The protocol type.
+ */
+private final String protocolType;
+
+/**
+ * The list of supported protocols.
+ */
+private final List supportedProtocols;
+
+/**
+ * The assignment stored by the client assignor.
+ */
+private final byte[] assignment;
+
+/**
+ * The callback that is invoked once this member joins the group.
+ */
+private CompletableFuture awaitingJoinCallback = 
null;

Review Comment:
   nit: Should we call this one `awaitingJoinCallbackFuture`?



##
group-coordinator/s

[GitHub] [kafka] clolov commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

2023-05-04 Thread via GitHub


clolov commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1185002319


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -129,16 +150,19 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public abstract class ConsumerCoordinatorTest {
-private final String topic1 = "test1";
-private final String topic2 = "test2";
-private final TopicPartition t1p = new TopicPartition(topic1, 0);
-private final TopicPartition t2p = new TopicPartition(topic2, 0);
-private final String groupId = "test-group";
+private static String topic1 = "test1";
+private static String topic2 = "test2";
+private static TopicPartition t1p = new TopicPartition(topic1, 0);
+private static TopicIdPartition ti1p = new 
TopicIdPartition(Uuid.randomUuid(), t1p);

Review Comment:
   I believe t1p is abstracted because it is being used in 175 other places in 
this test class for test setup and assertions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


machi1990 commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534757023

   > > In relation to dependency upgrade, has there been any discussion around 
automated tooling e.g usage of dependabot or renovate?
   > 
   > I don't know. I have seen @ijuma being the one who periodically performs 
dependency upgrades. He may be able to provide more info about this. 
   
   Thanks, I'll be interested in any details that could be provided @ijuma 
   
   > Dependabot is a good idea (and some other Apache communities use it), 
except when it leads to noise. I don't know if there is a way to "mute" it and 
enable only at the beginning of a release cycle.
   
   Yes, it is possible. With dependabot you can limit the number of PRs opened. 
Setting the limit to `0` will equate disabling depedency update for a given 
package ecosystem. Renovate has a disabling flag, which could be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


divijvaidya commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534729885

   > In relation to dependency upgrade, has there been any discussion around 
automated tooling e.g usage of dependabot or renovate?
   
   I don't know. I have seen @ijuma being the one who periodically performs 
dependency upgrades. He may be able to provide more info about this. Dependabot 
is a good idea (and some other Apache communities use it), except when it leads 
to noise. I don't know if there is a way to "mute" it and enable only at the 
beginning of a release cycle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13675: KAFKA-14462; [14/N]; Add PartitionWriter

2023-05-04 Thread via GitHub


dajac opened a new pull request, #13675:
URL: https://github.com/apache/kafka/pull/13675

   This patch introduces the `PartitionWriter` interface in the 
`group-coordinator` module. The `ReplicaManager` resides in the `core` module 
and it is thus not accessible from the `group-coordinator` one. The 
`PartitionWriterImpl` is basically an implementation of the interface residing 
in `core` which interfaces with the `ReplicaManager`.
   
   One notable difference from the usual produce path is that the 
`PartitionWriter` returns the offset following the written records. This is 
then used by the coordinator runtime (coming later) to track when the request 
associated with the write can be completed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


divijvaidya commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534725676

   Thanks for your comment @machi1990. In principle what you say is right but 
given the limited committer bandwidth in the community, I am trying to optimize 
for code reviewer comfort right now. That is why I have intentionally added 
only the non-controversial upgrades here in the PR. In case of a need for 
rollback, we can always choose to roll-forward by modifying the version of a 
specific dependency. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


machi1990 commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534706839

   In relation to dependency upgrade, has there been any discussion around 
automated tooling e.g usage of dependabot or renovate? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


machi1990 commented on PR #13673:
URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534704953

   Thanks @divijvaidya I am wondering whether it is best to separate each 
upgrade on a separate PR? That makes each dependency update atomic and thus 
easier to revert in case we notice issue related to a specific dependency 
upgrade. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13674: MINOR: Release resources in FetcherTest

2023-05-04 Thread via GitHub


divijvaidya opened a new pull request, #13674:
URL: https://github.com/apache/kafka/pull/13674

   FetcherTest fails intermittently with OOM since it doesn't close the spied 
resources properly.
   
   ```
   org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not 
complete execution for Gradle Test Executor 1565.
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:64)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
at 
app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at 
app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvertByEditor(MethodHandleImpl.java:318)
at 
java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvert(MethodHandleImpl.java:262)
at 
java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvert(MethodHandleImpl.java:379)
at 
java.base/java.lang.invoke.MethodHandle.asTypeUncached(MethodHandle.java:885)
at java.base/java.lang.invoke.MethodHandle.asType(MethodHandle.java:869)
at 
java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.isUnavailable(ConsumerNetworkClient.java:559)
at 
org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:461)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:72)
at 
org.apache.kafka.clients.consumer.internals.FetcherTest.sendFetches(FetcherTest.java:236)
at 
org.apache.kafka.clients.consumer.internals.FetcherTest.testFetcherConcurrency(FetcherTest.java:2945)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13673: MINOR: Update dependencies (minor versions only)

2023-05-04 Thread via GitHub


divijvaidya opened a new pull request, #13673:
URL: https://github.com/apache/kafka/pull/13673

   All dependency upgrades in the PR are minor upgrades with backward 
compatible changes. Note that no major version for dependencies have been 
changed to make it a low risk change (and possibly merge to 3.5). There are 
separate PRs such as https://github.com/apache/kafka/pull/13662 which will 
upgrade the major versions.
   
   ## Release notes for dependencies:
   ### bcpkix
   Release notes: https://www.bouncycastle.org/releasenotes.html#r1rv73 
   ### httpclient
   Release notes: 
https://downloads.apache.org/httpcomponents/httpclient/RELEASE_NOTES-4.5.x.txt 
   ### jackson and jackson-databind
   Release notes: 
https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.13.5 
   ### jacoco
   Release notes: https://github.com/jacoco/jacoco/releases 
   ### javaassist 
   Release notes: https://github.com/jboss-javassist/javassist/releases
   ### jetty
   Release notes: 
https://github.com/eclipse/jetty.project/releases?q=9.4.51&expanded=true 
   ### jersey
   Release notes: https://github.com/eclipse-ee4j/jersey/releases 
   ### jline
   Release notes: https://github.com/jline/jline3/releases
   ### jaxb
   Can't find release notes! But 
https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api/2.3.1 is the latest 
version in maven.
   ### junit
   Release notes: 
https://junit.org/junit5/docs/current/release-notes/index.html#release-notes-5.9.3
 
   ### mockito
   Release notes: 
https://github.com/mockito/mockito/releases?q=v4.11.0&expanded=true
   ### netty
   Release notes: https://netty.io/news/2023/04/25/4-1-92-Final.html 
   ### reload4j
   Release notes: https://reload4j.qos.ch/ 
   ### scalaCollectionCompat
   Release notes: 
https://github.com/scala/scala-collection-compat/releases?q=2.10.0&expanded=true
   ### scoverage
   Release notes: https://github.com/scoverage/sbt-scoverage/releases?page=2 
   
   ## Compatibility
   - Verified build and test with JDK8 and JDK 17
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable

2023-05-04 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719288#comment-17719288
 ] 

Sergey Ivanov commented on KAFKA-14817:
---

[~showuon], thank you for explanation! 

At the moment we "fixed" the issues with uncleaned segments with setting 
temporary "delete" policy on problem environment, and no more uncleaned 
partitions.

But I'll try to find the same issue on other non-sensitive environments and If 
find I attach dump. 

> LogCleaner mark some partitions of __consumer_offsets as uncleanable
> 
>
> Key: KAFKA-14817
> URL: https://issues.apache.org/jira/browse/KAFKA-14817
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: ZhenChun Pan
>Priority: Major
>
> We find some patitions of topic __consumer_offsets can't retention it's log 
> any more and takes up a lot of disk space. Then we found these patitions of 
> topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The 
> logs below:
> [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-24. (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-24... (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-24 for 5 segments in offset range [0, 2360519). 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144 bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log 
> Log(dir=/opt/kafka-service/data/__consumer_offsets-24, 
> topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, 
> logStartOffset=0, logEndOffset=2759760). Marking its partition 
> (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner)
> kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but 
> not processed for log segment 
> /opt/kafka-service/data/__consumer_offsets-24/.log at 
> position 31457091
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356)
>         at 
> kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> Caused by: java.lang.IllegalStateException: Batch size 223 < buffer size 
> 10485772, but not processed for log segment 
> /opt/kafka-service/data/__consumer_offsets-24/.log at 
> position 31457091
>         at kafka.log.Cleaner.growBuffersOrFail(LogCleaner.scala:745)
>         at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:983)
>         at kafka.log.Cleaner.$anonfun$buildOffsetMap$5(LogCleaner.scala:908)
>         at 
> kafka.log.Cleaner.$anonfun$buildOffsetMap$5$adapted(LogCleaner.scala:904)
>         at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
>         at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:904)
>         at kafka.log.Cleaner.doClean(LogCleaner.scala:523)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:511)
>         at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352)
>         ... 3 more
> [2023-03-17 17:54:02,477] INFO Cleaner 0: Beginning cl

[jira] [Comment Edited] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option

2023-05-04 Thread Jonas Lundholm Bertelsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719274#comment-17719274
 ] 

Jonas Lundholm Bertelsen edited comment on KAFKA-13392 at 5/4/23 11:12 AM:
---

We have seen this same issue with a Kafka 3.3 cluster where one broker is down 
and we want to reassign replicas away from it.

The `–zookeeper` argument is no longer available, so we were not able to use 
that workaround.

An alternative workaround is to simply omit the throttle parameter. This 
obviously has its own drawbacks. In our case we split the reassignment file 
into smaller parts and applied them one after the other to try and put a 
"manual throttle" on activity spikes.


was (Author: JIRAUSER300177):
We have seen this same issue with a Kafka 3.4 cluster where one broker is down 
and we want to reassign replicas away from it.

The `–zookeeper` argument is no longer available, so we were not able to use 
that workaround.

An alternative workaround is to simply omit the throttle parameter. This 
obviously has its own drawbacks. In our case we split the reassignment file 
into smaller parts and applied them one after the other to try and put a 
"manual throttle" on activity spikes.

> Timeout Exception triggering reassign partitions with --bootstrap-server 
> option
> ---
>
> Key: KAFKA-13392
> URL: https://issues.apache.org/jira/browse/KAFKA-13392
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0
>Reporter: Yevgeniy Korin
>Priority: Minor
>
> *Scenario when we faced with this issue:*
>  One of three brokers is down. Add another (fourth) broker and try to 
> reassign partitions using '--bootstrap-server'
>  option.
> *What's failed:*
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 
> xxx.xxx.xxx.xxx:9092 --reassignment-json-file 
> /tmp/reassignment-20211021130718.json --throttle 1 --execute{code}
> failed with
> {code:java}
> Error: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435)
>  at 
> kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412)
>  at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974)
>  at 
> kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255)
>  at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216)
>  at 
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
>  Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out 
> waiting for a node assignment. Call: incrementalAlterConfigs{code}
>  *Expected behavio**:*
>  partition reassignment process started.
> *Workaround:*
>  Trigger partition reassignment process using '--zookeeper' option:
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> zookeeper.my.company:2181/kafka-cluster --reassignment-json-file 
> /tmp/reassignment-20211021130718.json --throttle 1 --execute{code}
>  *Additional info:*
>  We are able to trigger partition reassignment using '--bootstrap-server' 
> option with no exceptions when all four brokers are alive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-04 Thread via GitHub


satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -670,6 +875,14 @@ public void close() {
 } catch (InterruptedException e) {
 // ignore
 }
+remoteStorageReaderThreadPool.shutdownNow();
+//waits for 2 mins to terminate the current tasks
+try {
+remoteStorageReaderThreadPool.awaitTermination(2, 
TimeUnit.MINUTES);

Review Comment:
   That code `lifecycleManager.controlledShutdownFuture` is more about 
processing the controlled shutdown event to the controller for that broker. It 
will wait for 5 mins before proceeding with other sequence of actions. But that 
will not get affected because of the code introduced here. 
   Logging subsystem handles unclean shutdown for log segments and it would 
have been already finished before RemoteLogManager is closed. So, they will not 
get affected because of this timeout. But we can have a short duration here 
like 10 secs, we can revisit introducing a config if it is really needed for 
closing the remote log subsystem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option

2023-05-04 Thread Jonas Lundholm Bertelsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719274#comment-17719274
 ] 

Jonas Lundholm Bertelsen commented on KAFKA-13392:
--

We have seen this same issue with a Kafka 3.4 cluster where one broker is down 
and we want to reassign replicas away from it.

The `–zookeeper` argument is no longer available, so we were not able to use 
that workaround.

An alternative workaround is to simply omit the throttle parameter. This 
obviously has its own drawbacks. In our case we split the reassignment file 
into smaller parts and applied them one after the other to try and put a 
"manual throttle" on activity spikes.

> Timeout Exception triggering reassign partitions with --bootstrap-server 
> option
> ---
>
> Key: KAFKA-13392
> URL: https://issues.apache.org/jira/browse/KAFKA-13392
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0
>Reporter: Yevgeniy Korin
>Priority: Minor
>
> *Scenario when we faced with this issue:*
>  One of three brokers is down. Add another (fourth) broker and try to 
> reassign partitions using '--bootstrap-server'
>  option.
> *What's failed:*
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 
> xxx.xxx.xxx.xxx:9092 --reassignment-json-file 
> /tmp/reassignment-20211021130718.json --throttle 1 --execute{code}
> failed with
> {code:java}
> Error: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435)
>  at 
> kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412)
>  at 
> kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974)
>  at 
> kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255)
>  at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216)
>  at 
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
>  Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, 
> nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s)
>  Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out 
> waiting for a node assignment. Call: incrementalAlterConfigs{code}
>  *Expected behavio**:*
>  partition reassignment process started.
> *Workaround:*
>  Trigger partition reassignment process using '--zookeeper' option:
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> zookeeper.my.company:2181/kafka-cluster --reassignment-json-file 
> /tmp/reassignment-20211021130718.json --throttle 1 --execute{code}
>  *Additional info:*
>  We are able to trigger partition reassignment using '--bootstrap-server' 
> option with no exceptions when all four brokers are alive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya opened a new pull request, #13672: MINOR: Print the cause of failure for PlaintextAdminIntegrationTest

2023-05-04 Thread via GitHub


divijvaidya opened a new pull request, #13672:
URL: https://github.com/apache/kafka/pull/13672

   # Motivation
   PlaintextAdminIntegrationTest fails in a flaky manner with the follow trace 
(e.g. in [this 
build](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13670/3/tests/)):
   ```
   org.opentest4j.AssertionFailedError: expected:  but was: 
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31)
at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:228)
at 
kafka.api.PlaintextAdminIntegrationTest.testElectUncleanLeadersForOnePartition(PlaintextAdminIntegrationTest.scala:1583)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   ```
   The std output doesn't contain useful information that we could use to debug 
the cause of failure. This is because the test, currently, validates if there 
is an exception and fails when one is present. It does not print what the 
exception is.
   
   # Change
   1. Make the test a bit more robust by waiting for server startup.
   2. Fail the test with the actual unexpected exception which will help in 
debugging the cause of failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hertzsprung opened a new pull request, #13671: KAFKA-14967: fix NPE in CreateTopicsResult

2023-05-04 Thread via GitHub


hertzsprung opened a new pull request, #13671:
URL: https://github.com/apache/kafka/pull/13671

   instead of passing `null` to the future, pass a populated 
`TopicAndMetadataConfig`
   
   `MockAdminClient` has no tests that I'm aware of.
   
   The contribution is my original work and that I license the work to the 
project under the project's open source license.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1184835821


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[jira] [Comment Edited] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable

2023-05-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719250#comment-17719250
 ] 

Luke Chen edited comment on KAFKA-14817 at 5/4/23 10:11 AM:


[~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error 
means we cannot recognize the batch in the log position (maybe corrupted), so 
that we cannot proceed with compaction for this partition logs. Like pointed 
out in this article  
[https://luppeng.wordpress.com/2022/08/21/possible-reasons-why-a-kafka-topic-is-not-being-compacted/]
 (point 3), letting Kafka do log recovery by itself should be able to fix it.

About how we can fix this permanently, I have to think about it.

But before that, if it's possible to help verify if the log segments appeared 
in the log is indeed corrupted?

You can verify it by running this command to dump the uncleanable logs?:
{code:java}
./bin/kafka-dump-log.sh --files 
/tmp/kraft-combined-logs/quickstart-events-0/.log{code}
Of course, if it's allowed, it'll be better you upload the uncleanable logs for 
me to investigation. But I can understand if it's not permitted.

 

Thanks.


was (Author: showuon):
[~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error 
means we cannot recognize the batch in the log position (maybe corrupted), so 
that we cannot proceed with compaction for this partition logs. Like 
[~mrMigles] pointed out, letting Kafka do log recovery by itself should be able 
to fix it. 

About how we can fix this permanently, I have to think about it.

But before that, if it's possible to help verify if the log segments appeared 
in the log is indeed corrupted?

You can verify it by running this command to dump the uncleanable logs?:
{code:java}
./bin/kafka-dump-log.sh --files 
/tmp/kraft-combined-logs/quickstart-events-0/.log{code}
Of course, if it's allowed, it'll be better you upload the uncleanable logs for 
me to investigation. But I can understand if it's not permitted.

 

Thanks.

> LogCleaner mark some partitions of __consumer_offsets as uncleanable
> 
>
> Key: KAFKA-14817
> URL: https://issues.apache.org/jira/browse/KAFKA-14817
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: ZhenChun Pan
>Priority: Major
>
> We find some patitions of topic __consumer_offsets can't retention it's log 
> any more and takes up a lot of disk space. Then we found these patitions of 
> topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The 
> logs below:
> [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-24. (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-24... (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-24 for 5 segments in offset range [0, 2360519). 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144 bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log 
> Log(dir=/opt/kafka-service/data/__consumer_offsets-24, 
> topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, 
> logStartOffset=0, logEndOffset=2759760). Marking its partition 
> (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner)
> kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but 
> not processed for log segment 
> /opt/kafka-service/data/__consumer_offsets-24/.log at 
> position 31457091
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356)
>         at 
> kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
>       

[jira] [Commented] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable

2023-05-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719250#comment-17719250
 ] 

Luke Chen commented on KAFKA-14817:
---

[~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error 
means we cannot recognize the batch in the log position (maybe corrupted), so 
that we cannot proceed with compaction for this partition logs. Like 
[~mrMigles] pointed out, letting Kafka do log recovery by itself should be able 
to fix it. 

About how we can fix this permanently, I have to think about it.

But before that, if it's possible to help verify if the log segments appeared 
in the log is indeed corrupted?

You can verify it by running this command to dump the uncleanable logs?:
{code:java}
./bin/kafka-dump-log.sh --files 
/tmp/kraft-combined-logs/quickstart-events-0/.log{code}
Of course, if it's allowed, it'll be better you upload the uncleanable logs for 
me to investigation. But I can understand if it's not permitted.

 

Thanks.

> LogCleaner mark some partitions of __consumer_offsets as uncleanable
> 
>
> Key: KAFKA-14817
> URL: https://issues.apache.org/jira/browse/KAFKA-14817
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.1
>Reporter: ZhenChun Pan
>Priority: Major
>
> We find some patitions of topic __consumer_offsets can't retention it's log 
> any more and takes up a lot of disk space. Then we found these patitions of 
> topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The 
> logs below:
> [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-24. (kafka.log.LogCleaner)
> [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-24... (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-24 for 5 segments in offset range [0, 2360519). 
> (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144 bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner)
> [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log 
> Log(dir=/opt/kafka-service/data/__consumer_offsets-24, 
> topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, 
> logStartOffset=0, logEndOffset=2759760). Marking its partition 
> (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner)
> kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but 
> not processed for log segment 
> /opt/kafka-service/data/__consumer_offsets-24/.log at 
> position 31457091
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356)
>         at 
> kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> Caused by: java.lang.IllegalStateException: Batch size 223 < buffer size 
> 10485772, but not processed for log segment 
> /opt/kafka-service/data/__consumer_offsets-24/.log at 
> position 31457091
>         at kafka.log.Cleaner.growBuffersOrFail(LogCleaner.scala:745)
>         at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:983)
>         at kafka.log.Cleaner.$anonfun$buildOffsetMap$5(LogCleaner.scala:908)
>         at 
> kafka.log.Cleaner.$anonfun$buildOffsetMap$5$adapted(LogCleaner.scala:904)
>         at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.s

[GitHub] [kafka] mimaison merged pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-05-04 Thread via GitHub


mimaison merged PR #13122:
URL: https://github.com/apache/kafka/pull/13122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…

2023-05-04 Thread via GitHub


chia7712 commented on PR #13659:
URL: https://github.com/apache/kafka/pull/13659#issuecomment-1534435900

   @kirktrue could you take a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-04 Thread via GitHub


divijvaidya commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1184786130


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   Your suggestion will work (for now) until all metrics in this class of type 
`gauge` but later if someone introduces a metric of another type and adds it to 
MetricNames, it will fail.
   
   Nevertheless, I have made the change as you suggested. The test could be 
changed later when someone adds a new metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sandybis commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-05-04 Thread via GitHub


sandybis commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1534414079

   @MPeli 
   I found AccessDenied exception again when a topic is deleted. This causes a 
crash same way it used to while deleting message.
   Could the same fix be applied to delete topic as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1184761649


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult

2023-05-04 Thread James Shaw (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Shaw updated KAFKA-14967:
---
Description: 
Calling {{CreateTopicsResult.topicId().get()}} throws 
{{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the 
topicId.

The NPE appears to be caused by [{{MockAdminClient.createTopics()}} calling 
{{future.complete(null)}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L394]

Stacktrace:
{code:java}
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
   [snip]
Caused by: java.lang.NullPointerException
at 
org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
at 
java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58)
at 
org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82)
... 85 more
{code}


Test case to reproduce:
{code:java}
 
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static java.util.Collections.singletonList;

public class MockAdminClientBug {
@Test
void shouldNotThrowNullPointerException() throws ExecutionException, 
InterruptedException {
Node controller = new Node(0, "mock", 0);
try (Admin admin = new MockAdminClient(singletonList(controller), 
controller)) {
CreateTopicsResult result = admin.createTopics(singletonList(new 
NewTopic("TestTopic", Optional.empty(), Optional.empty(;
Uuid topicId = result.topicId("TestTopic").get();
System.out.println(topicId);
}
}
}
{code}

  was:
Calling {{CreateTopicsResult.topicId().get()}} throws 
{{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the 
topicId.

Stacktrace:
{code:java}
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
   [snip]
Caused by: java.lang.NullPointerException
at 
org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
at 
java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58)
at 
org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82)
... 85 more
{code}


Test case to reproduce:
{code:java}
 
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static java.util.Collections.singletonList;

public class MockAdminClientBug {
@Test
void shouldNotThrowNullPointerException() throws ExecutionException, 
InterruptedException {
Node controller = new Node(0, "mock", 0);
try (Admin admin = new MockAdminClient(singletonList(controller), 
controller)) {
CreateTopicsResult result = admin.createTopics(singletonList(new 
NewTopic("TestTopic", Optional.empty(), Optional.empty(;

[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1184756806


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult

2023-05-04 Thread James Shaw (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Shaw updated KAFKA-14967:
---
Description: 
Calling {{CreateTopicsResult.topicId().get()}} throws 
{{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the 
topicId.

Stacktrace:
{code:java}
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
   [snip]
Caused by: java.lang.NullPointerException
at 
org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
at 
java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58)
at 
org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82)
... 85 more
{code}


Test case to reproduce:
{code:java}
 
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static java.util.Collections.singletonList;

public class MockAdminClientBug {
@Test
void shouldNotThrowNullPointerException() throws ExecutionException, 
InterruptedException {
Node controller = new Node(0, "mock", 0);
try (Admin admin = new MockAdminClient(singletonList(controller), 
controller)) {
CreateTopicsResult result = admin.createTopics(singletonList(new 
NewTopic("TestTopic", Optional.empty(), Optional.empty(;
Uuid topicId = result.topicId("TestTopic").get();
System.out.println(topicId);
}
}
}
{code}

  was:
Calling {{CreateTopicsResult.topicId().get()}} throws 
{{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the 
topicId.

Stacktrace:
{code:java}
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(Invocat

[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult

2023-05-04 Thread James Shaw (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Shaw updated KAFKA-14967:
---
Description: 
Calling {{CreateTopicsResult.topicId().get()}} throws 
{{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the 
topicId.

Stacktrace:
{code:java}
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(N

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[jira] [Created] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult

2023-05-04 Thread James Shaw (Jira)
James Shaw created KAFKA-14967:
--

 Summary: MockAdminClient throws NullPointerException in 
CreateTopicsResult
 Key: KAFKA-14967
 URL: https://issues.apache.org/jira/browse/KAFKA-14967
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: James Shaw


Calling {{CreateTopicsResult.topicId().get()}} throws {{NullPointerException}}, 
while {{KafkaAdminClient}} correctly returns the topicId.


Test case to reproduce:

{code:java} 
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static java.util.Collections.singletonList;

public class MockAdminClientBug {
@Test
void shouldNotThrowNullPointerException() throws ExecutionException, 
InterruptedException {
Node controller = new Node(0, "mock", 0);
try (Admin admin = new MockAdminClient(singletonList(controller), 
controller)) {
CreateTopicsResult result = admin.createTopics(singletonList(new 
NewTopic("TestTopic", Optional.empty(), Optional.empty(;
Uuid topicId = result.topicId("TestTopic").get();
System.out.println(topicId);
}
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13669: MINOR: Fix producer Callback comment

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13669:
URL: https://github.com/apache/kafka/pull/13669#discussion_r1184732376


##
clients/src/main/java/org/apache/kafka/clients/producer/Callback.java:
##
@@ -36,7 +36,7 @@ public interface Callback {
  *  Non-Retriable exceptions (fatal, the message will 
never be sent):
  *  
  *  InvalidTopicException
- *  OffsetMetadataTooLargeException
+ *  OffsetMetadataTooLarge

Review Comment:
   Hi @machi1990, that's good idea. 
   
   I think what you propose would help in preventing errors like this. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-05-04 Thread via GitHub


dajac commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1184723565


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -62,6 +65,39 @@ class LogCleanerTest {
 Utils.delete(tmpdir)
   }
 
+  @Test
+  def testRemoveMetricsOnClose(): Unit = {
+val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+try {
+  val logCleaner = new LogCleaner(new CleanerConfig(true),
+logDirs = Array(TestUtils.tempDir()),
+logs = new Pool[TopicPartition, UnifiedLog](),
+logDirFailureChannel = new LogDirFailureChannel(1),
+time = time)
+
+  // shutdown logCleaner so that metrics are removed
+  logCleaner.shutdown()
+
+  val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
+  val numMetricsRegistered = 5
+  verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())

Review Comment:
   That makes sense. Thanks for the clarification. I wonder if we could use 
`LogCleaner.MetricNames.size` instead of hardcoding `5`. I suppose that it 
would have a similar semantic, does it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-04 Thread via GitHub


dajac commented on PR #13666:
URL: https://github.com/apache/kafka/pull/13666#issuecomment-1534317488

   @kirktrue Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-04 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1184718825


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(threadPrefix + threadId)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends ShutdownableThread {
+EventProcessorThread(
+String name
+) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+while (!shuttingDown) {

Review Comment:
   It is not required but I agree that it is confusing. I think that I will 
just use a regular `Thread` because it better fits my need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-04 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1184688955


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+/**
+ * The base event type used by all events processed in the
+ * coordinator runtime.
+ */
+public interface CoordinatorEvent extends EventAccumulator.Event {
+/**
+ * Runs the event.
+ */
+void run();

Review Comment:
   Yeah, I was debating this as well. My conclusion was that `Runnable` is not 
needed anywhere. I mean that we never downcast the event to `Runnable`. 
Therefore I kept it simple and just defined the interface that I need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-04 Thread via GitHub


dajac commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1184687089


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(threadPrefix + threadId)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends ShutdownableThread {
+EventProcessorThread(
+String name
+) {
+super(name, false);
+}
+
+@Override
+public void doWork() {
+while (!shuttingDown) {
+CoordinatorEvent event = accumulator.poll();
+if (event == null) continue;
+
+try {
+log.debug("Executing event " + event);
+event.run();
+} catch (Throwable t) {
+log.error("Failed to run event " + event + " due to: " + 
t, t);

Review Comment:
   Nope. It is just a bad habit, I suppose. Let me change them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >