[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185752497 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertE
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185749017 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertE
[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185732188 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1535663733 There are indeed corresponding recovery methods for this issue, such as directly deleting the future log on the disk and restarting the corresponding broker. Therefore, if the failed source partition does not resume deletion immediately when marked as failed, is it necessary to add a monitoring metric to this situation for timely detection? @divijvaidya @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding
showuon commented on code in PR #13312: URL: https://github.com/apache/kafka/pull/13312#discussion_r1185666434 ## clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java: ## @@ -241,6 +244,119 @@ public void testDouble() throws IOException { assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L); } +@Test +public void testCorrectnessWriteUnsignedVarlong() { Review Comment: I agree. So maybe we can `disable` them for now, and add comment for future use like what we did [here](https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala#L1410). After all, the current varLong test is just running the same logic and verify they have the same results, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185665409 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] showuon commented on a diff in pull request #13660: KAFKA-14662: Update the ACL list in the doc
showuon commented on code in PR #13660: URL: https://github.com/apache/kafka/pull/13660#discussion_r1185661328 ## docs/security.html: ## @@ -2089,6 +2089,144 @@
[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController
hudeqi commented on code in PR #13617: URL: https://github.com/apache/kafka/pull/13617#discussion_r1185655736 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule { /** * Tracks if a snapshot generate was scheduled. */ -private boolean generateSnapshotScheduled = false; +private final boolean generateSnapshotScheduled; Review Comment: It seems that this variable has been deleted in # 13540 and can be directly ignored... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController
hudeqi commented on code in PR #13617: URL: https://github.com/apache/kafka/pull/13617#discussion_r1185653614 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule { /** * Tracks if a snapshot generate was scheduled. */ -private boolean generateSnapshotScheduled = false; +private final boolean generateSnapshotScheduled; Review Comment: > If the value is always set to `false`, why have the variable at all? 🤔 Hi, I also want to directly delete this variable because it has not been used anywhere, but I am not sure if this variable will be used in subsequent iteration development. @kirktrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13617: MINOR:code optimization in QuorumController
hudeqi commented on code in PR #13617: URL: https://github.com/apache/kafka/pull/13617#discussion_r1185653614 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule { /** * Tracks if a snapshot generate was scheduled. */ -private boolean generateSnapshotScheduled = false; +private final boolean generateSnapshotScheduled; Review Comment: > If the value is always set to `false`, why have the variable at all? 🤔 Hi, I also want to directly delete this variable because it has not been used anywhere, but I am not sure if this variable will be used in subsequent iteration development. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14914) binarySearch in AbstactIndex may execute with infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-14914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719585#comment-17719585 ] Luke Chen commented on KAFKA-14914: --- [~flashmouse] , thanks for reporting this issue. Does the offset index still exist? Could you upload these indexes for investigation? > binarySearch in AbstactIndex may execute with infinite loop > --- > > Key: KAFKA-14914 > URL: https://issues.apache.org/jira/browse/KAFKA-14914 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0 >Reporter: li xiangyuan >Priority: Major > Attachments: stack.1.txt, stack.2.txt, stack.3.txt > > > Recently our servers in production environment may suddenly stop handle > request frequently(for now 3 times in less than 10 days), please check the > stack file uploaded, it show that 1 > ioThread(data-plane-kafka-request-handler-11) hold the ReadLock of > Partition's leaderIsrUpdateLock and keep run the binarySearch function, once > any thread(kafka-scheduler-2) need WriteMode Of this lock, then all requests > read this partition need ReadMode Lock will use out all ioThreads and then > this broker couldn't handle any request. > the 3 stack files are fetched with interval about 6 minute, with my > standpoint i just could think obviously the binarySearch function cause dead > lock and I presuppose maybe the index block values in offsetIndex (at least > in mmap) are not sorted. > > detail information: > this problem appear in 2 brokers > broker version: 2.4.0 > jvm: openjdk 11 > hardware: aws c7g 4xlarge, this is a arm64 server, we recently upgrade our > servers from c6g 4xlarge to this type, when we use c6g haven't meet this > problem, we don't know whether arm or aws c7g server have any problem. > other: once we restart broker, it will recover, so we doubt offset index file > may not corrupted and maybe something wrong with mmap. > plz give any suggestion solve this problem, thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
clayburn commented on code in PR #13676: URL: https://github.com/apache/kafka/pull/13676#discussion_r1185635216 ## build.gradle: ## @@ -39,7 +39,6 @@ plugins { id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8" id "com.github.spotbugs" version '5.0.13' apply false - id 'org.gradle.test-retry' version '1.5.2' apply false Review Comment: That's what this PR proposes, publishing build scans to the Gradle Enterprise instance at https://ge.apache.org. You can see the configuration in the `settings.gradle` portion of the PR. Because the Gradle Enterprise Gradle Plugin bundles in the test-retry plugin, it is necessary to remove these lines. Otherwise test-retry will be loaded twice and fail. The retry functionality remains the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1185622586 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: We have a guarantee. Inside `do_stop_start_bounce` we wait until the processor prints that it has processed records. Not sure if we want to tighten this check, but I think it should be ok? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
bmscomp commented on code in PR #13676: URL: https://github.com/apache/kafka/pull/13676#discussion_r1185622030 ## build.gradle: ## @@ -39,7 +39,6 @@ plugins { id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8" id "com.github.spotbugs" version '5.0.13' apply false - id 'org.gradle.test-retry' version '1.5.2' apply false Review Comment: @clayburn is apache Kafka using Gradle entreprise ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1185621879 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'B' +extra_properties['test.expected_agg_values'] = 'A,B' +for p in self.processors[:-1]: +self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) Review Comment: Yes, `[n:m]` returns a sub-string (`from_version` is a string), from index `n` to `m` (`m` exclusive). If you omit `n` it's from beginning, if you omit `m` it's to the end. A negative index is "from backwards", so here we get "from beginning" to "second last", ie, we cut of the last to chars, ie, the `.x` bug-fix suffix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
mjsax commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1185618918 ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java: ## @@ -106,7 +106,11 @@ private static class ValueList { } int next() { -return (index < values.length) ? values[index++] : -1; +final int v = values[index++]; +if (index >= values.length) { +index = 0; +} Review Comment: Seems the comment is outdated. The custom TS-extractor was removed years ago: https://github.com/apache/kafka/commit/52e397962b624f3c881b6f99e71c94da32cf6a33 Let me delete the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719572#comment-17719572 ] A. Sophie Blee-Goldman commented on KAFKA-13891: Thanks [~pnee] – just to round out the above, the actual fix was in [pull/13550|https://github.com/apache/kafka/pull/13550] > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0, 3.4.1 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bmscomp commented on pull request #13673: MINOR: Update dependencies (minor versions only)
bmscomp commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1535547766 It's a good think to keep dependencies up to date -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14016) Revoke more partitions than expected in Cooperative rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719570#comment-17719570 ] A. Sophie Blee-Goldman commented on KAFKA-14016: Nice job [~pnee] – thanks for taking the time to patch this finally. To clarify, since there are a lot of tickets and PRs floating around here, the assignor fix is in [pull/13550|https://github.com/apache/kafka/pull/13550] > Revoke more partitions than expected in Cooperative rebalance > - > > Key: KAFKA-14016 > URL: https://issues.apache.org/jira/browse/KAFKA-14016 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Labels: new-rebalance-should-fix > Fix For: 3.5.0, 3.4.1 > > Attachments: CooperativeStickyAssignorBugReproduction.java > > > In https://issues.apache.org/jira/browse/KAFKA-13419 we found that some > consumer didn't reset generation and state after sync group fail with > REABALANCE_IN_PROGRESS error. > So we fixed it by reset generationId (no memberId) when sync group fail with > REABALANCE_IN_PROGRESS error. > But this change missed the reset part, so another change made in > https://issues.apache.org/jira/browse/KAFKA-13891 make this works. > After apply this change, we found that: sometimes consumer will revoker > almost 2/3 of the partitions with cooperative enabled. Because if a consumer > did a very quick re-join, other consumers will get REABALANCE_IN_PROGRESS in > syncGroup and revoked their partition before re-jion. example: > # consumer A1-A10 (ten consumers) joined and synced group successfully with > generation 1 > # New consumer B1 joined and start a rebalance > # all consumer joined successfully and then A1 need to revoke partition to > transfer to B1 > # A1 do a very quick syncGroup and re-join, because it revoked partition > # A2-A10 didn't send syncGroup before A1 re-join, so after the send > syncGruop, will get REBALANCE_IN_PROGRESS > # A2-A10 will revoke there partitions and re-join > So in this rebalance almost every partition revoked, which highly decrease > the benefit of Cooperative rebalance > I think instead of "{*}resetStateAndRejoin{*} when > *RebalanceInProgressException* errors happend in {*}sync group{*}" we need > another way to fix it. > Here is my proposal: > # revert the change in https://issues.apache.org/jira/browse/KAFKA-13891 > # In Server Coordinator handleSyncGroup when generationId checked and group > state is PreparingRebalance. We can send the assignment along with the error > code REBALANCE_IN_PROGRESS. ( i think it's safe since we verified the > generation first ) > # When get the REBALANCE_IN_PROGRESS error in client, try to apply the > assignment first and then set the rejoinNeeded = true to make it re-join > immediately -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
cmccabe commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185605065 ## metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.PartitionRegistration; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CapturingTopicMigrationClient implements TopicMigrationClient { +public List deletedTopics = new ArrayList<>(); +public List createdTopics = new ArrayList<>(); +public LinkedHashMap> updatedTopicPartitions = new LinkedHashMap<>(); + +public void reset() { +createdTopics.clear(); +updatedTopicPartitions.clear(); +deletedTopics.clear(); +} + + +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { + Review Comment: OK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
cmccabe commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185604698 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.image.AclsDelta; +import org.apache.kafka.image.AclsImage; +import org.apache.kafka.image.ClientQuotasDelta; +import org.apache.kafka.image.ClientQuotasImage; +import org.apache.kafka.image.ConfigurationsDelta; +import org.apache.kafka.image.ConfigurationsImage; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.authorizer.StandardAcl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KRaftMigrationZkWriter { +private final MigrationClient migrationClient; +private final BiConsumer operationConsumer; + +public KRaftMigrationZkWriter( +MigrationClient migrationClient, +BiConsumer operationConsumer +) { +this.migrationClient = migrationClient; +this.operationConsumer = operationConsumer; +} + +public void handleSnapshot(MetadataImage image) { +handleTopicsSnapshot(image.topics()); +handleConfigsSnapshot(image.configs()); +handleClientQuotasSnapshot(image.clientQuotas()); +operationConsumer.accept("Setting next producer ID", migrationState -> + migrationClient.writeProducerId(image.producerIds().highestSeenProducerId(), migrationState)); +handleAclsSnapshot(image.acls()); +} + +public void handleDelta(MetadataImage previousImage, MetadataImage image, MetadataDelta delta) { Review Comment: as per the above discussion, this is not needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
cmccabe commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185604580 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.image.AclsDelta; +import org.apache.kafka.image.AclsImage; +import org.apache.kafka.image.ClientQuotasDelta; +import org.apache.kafka.image.ClientQuotasImage; +import org.apache.kafka.image.ConfigurationsDelta; +import org.apache.kafka.image.ConfigurationsImage; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.authorizer.StandardAcl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KRaftMigrationZkWriter { +private final MigrationClient migrationClient; +private final BiConsumer operationConsumer; + +public KRaftMigrationZkWriter( +MigrationClient migrationClient, +BiConsumer operationConsumer +) { +this.migrationClient = migrationClient; +this.operationConsumer = operationConsumer; +} + +public void handleSnapshot(MetadataImage image) { Review Comment: hmm, I guess that's true. we don't need it since the brokers shouldn't be reading (as opposed to writing) there... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
mumrah commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185602073 ## metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.PartitionRegistration; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CapturingTopicMigrationClient implements TopicMigrationClient { +public List deletedTopics = new ArrayList<>(); +public List createdTopics = new ArrayList<>(); +public LinkedHashMap> updatedTopicPartitions = new LinkedHashMap<>(); + +public void reset() { +createdTopics.clear(); +updatedTopicPartitions.clear(); +deletedTopics.clear(); +} + + +@Override +public void iterateTopics(EnumSet interests, TopicVisitor visitor) { + Review Comment: Not all of the usages implement iterateTopics. For example, tests that just exercise the KRaft delta -> ZK path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
mumrah commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185600997 ## core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.zk.migration + +import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls} +import kafka.security.authorizer.{AclAuthorizer, AclEntry} +import kafka.utils.Logging +import kafka.zk.ZkMigrationClient.wrapZkException +import kafka.zk.{KafkaZkClient, ResourceZNode, ZkAclStore, ZkVersion} +import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest} +import org.apache.kafka.common.acl.AccessControlEntry +import org.apache.kafka.common.resource.ResourcePattern +import org.apache.kafka.metadata.migration.{AclMigrationClient, MigrationClientException, ZkMigrationLeadershipState} +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException.Code + +import java.util +import java.util.function.BiConsumer +import scala.jdk.CollectionConverters._ + +class ZkAclMigrationClient( + zkClient: KafkaZkClient +) extends AclMigrationClient with Logging { + + private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): CreateRequest = { +// ZK broker needs the ACL change notification znode to be updated in order to process the new ACLs +val aclChange = ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern) +CreateRequest(aclChange.path, aclChange.bytes, zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL) + } + + private def tryWriteAcls( +resourcePattern: ResourcePattern, +aclEntries: Set[AclEntry], +create: Boolean, +state: ZkMigrationLeadershipState + ): Option[ZkMigrationLeadershipState] = wrapZkException { +val aclData = ResourceZNode.encode(aclEntries) + +val request = if (create) { + val path = ResourceZNode.path(resourcePattern) + CreateRequest(path, aclData, zkClient.defaultAcls(path), CreateMode.PERSISTENT) +} else { + SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion) +} + +val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state) +if (responses.head.resultCode.equals(Code.NONODE)) { + // Need to call this method again with create=true + None +} else { + // Write the ACL notification outside of a metadata multi-op + zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern)) + Some(state.withMigrationZkVersion(migrationZkVersion)) +} + } + + override def writeResourceAcls( +resourcePattern: ResourcePattern, +aclsToWrite: util.Collection[AccessControlEntry], +state: ZkMigrationLeadershipState + ): ZkMigrationLeadershipState = { +val acls = aclsToWrite.asScala.map(new AclEntry(_)).toSet +tryWriteAcls(resourcePattern, acls, create = false, state) match { + case Some(newState) => newState + case None => tryWriteAcls(resourcePattern, acls, create = true, state) match { +case Some(newState) => newState +case None => throw new MigrationClientException(s"Could not write ACLs for resource pattern $resourcePattern") + } +} + } + + override def deleteResource( +resourcePattern: ResourcePattern, +state: ZkMigrationLeadershipState + ): ZkMigrationLeadershipState = { +val request = DeleteRequest(ResourceZNode.path(resourcePattern), ZkVersion.MatchAnyVersion) +val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state) +if (responses.head.resultCode.equals(Code.OK) || responses.head.resultCode.equals(Code.NONODE)) { + // Write the ACL notification outside of a metadata multi-op + zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern)) + state.withMigrationZkVersion(migrationZkVersion) +} else { + throw new MigrationClientException(s"Could not delete ACL for resource pattern $resourcePattern") +} + } + + override def iterateAcls( +aclConsumer: BiConsumer[ResourcePattern, util
[GitHub] [kafka] mumrah commented on a diff in pull request #13461: KAFKA-14840: Support for snapshots during ZK migration
mumrah commented on code in PR #13461: URL: https://github.com/apache/kafka/pull/13461#discussion_r1185600867 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.migration; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.image.AclsDelta; +import org.apache.kafka.image.AclsImage; +import org.apache.kafka.image.ClientQuotasDelta; +import org.apache.kafka.image.ClientQuotasImage; +import org.apache.kafka.image.ConfigurationsDelta; +import org.apache.kafka.image.ConfigurationsImage; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.authorizer.StandardAcl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KRaftMigrationZkWriter { +private final MigrationClient migrationClient; +private final BiConsumer operationConsumer; + +public KRaftMigrationZkWriter( +MigrationClient migrationClient, +BiConsumer operationConsumer +) { +this.migrationClient = migrationClient; +this.operationConsumer = operationConsumer; +} + +public void handleSnapshot(MetadataImage image) { Review Comment: I thought we decided not to write out the KRaft broker registrations to ZK? So far, we have not implemented that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0
bmscomp commented on PR #13662: URL: https://github.com/apache/kafka/pull/13662#issuecomment-1535524442 @divijvaidya this pull request targets an update to version 2.15.0, please can you tell me better about updating not only minor versions ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test
mjsax commented on PR #13654: URL: https://github.com/apache/kafka/pull/13654#issuecomment-1535516492 Ok. Dug into this a little bit, and some test are broken, because KS is broken. The original PR for https://issues.apache.org/jira/browse/KAFKA-13769 broke KS, but it also added the test regression using randomized `application.id`s and thus masked the error it introduced. The issue was reported later via https://issues.apache.org/jira/browse/KAFKA-14646 and we fixed it forward. However, the fact remains that rolling bounce upgrades are broken for 2.4 - 3.0 release as "from_version". I updates this PR to disable the corresponding tests. Retriggered system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5658/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
junrao commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184289360 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1160,48 +1172,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { +val key = new TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), remoteFetchInfo.get.topicPartition.partition()) +val remoteFetchResult = new CompletableFuture[RemoteLogReadResult] +var remoteFetchTask: Future[Void] = null +try { + remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: RemoteLogReadResult) => { +remoteFetchResult.complete(result) +delayedRemoteFetchPurgatory.checkAndComplete(key) + }) +} catch { + // if the task queue of remote storage reader thread pool is full, return what we currently have + // (the data read from local log segment for the other topic-partitions) and an error for the topic-partition that + // we couldn't read from remote storage + case e: RejectedExecutionException => +val fetchPartitionData = logReadResults.map { case (tp, result) => + val r = { +if (tp.topicPartition().equals(remoteFetchInfo.get.topicPartition)) + createLogReadResult(e) +else + result + } + + tp -> r.toFetchPartitionData(false) +} +responseCallback(fetchPartitionData) +return +} + +// If there is remote data, we will read remote data, instead of waiting for new data. +val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo.get, + fetchPartitionStatus, params, logReadResults, this, responseCallback) + +delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) + } else { +// If there is not enough data to respond and there is no remote data, we will let the fetch request +// to wait for new data. Review Comment: let the fetch request to wait => let the fetch request wait ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig, result } + private def createLogReadResult(highWatermark: Long, Review Comment: Should this be in `object ReplicaManager`? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig, result } + private def createLogReadResult(highWatermark: Long, + leaderLogStartOffset: Long, + leaderLogEndOffset: Long, + e: Throwable) = { +LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + divergingEpoch = None, + highWatermark, + leaderLogStartOffset, + leaderLogEndOffset, + followerLogStartOffset = -1L, + fetchTimeMs = -1L, + lastStableOffset = None, + exception = Some(e)) + } + + def createLogReadResult(e: Throwable): LogReadResult = { Review Comment: Should this be in `object ReplicaManager`? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch
[GitHub] [kafka] sfc-gh-japatel commented on pull request #12647: KAFKA-14191: Add end-to-end latency metrics to Connectors
sfc-gh-japatel commented on PR #12647: URL: https://github.com/apache/kafka/pull/12647#issuecomment-1535462677 Hi team, any progress/plans on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1185544285 ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -3273,17 +3274,46 @@ class PartitionTest extends AbstractPartitionTest { baseOffset = 0L, producerId = producerId) partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) -assertFalse(partition.hasOngoingTransaction(producerId)) +assertEquals(OptionalLong.empty(), txnFirstOffset(producerId)) Review Comment: will fix duplicate code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1185533261 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1090,6 +1090,9 @@ private void maybeFailWithError() { } else if (lastError instanceof InvalidProducerEpochException) { throw new InvalidProducerEpochException("Producer with transactionalId '" + transactionalId + "' and " + producerIdAndEpoch + " attempted to produce with an old epoch"); +} else if (lastError instanceof IllegalStateException) { +throw new IllegalStateException("Producer with transactionalId '" + transactionalId Review Comment: Message now says: ``` Producer with transactionalId '' and cannot execute transactional method because of previous invalid state transition attempt ``` The `IllegalStateException` thrown from `maybeFailWithError` includes `lastError` as its `cause`. The stack trace would include the message for `lastError` which reads: ``` TransactionalId : Invalid transition attempted from state to state ``` Here's what it looks like from one of the unit tests: ``` Producer with transactionalId 'foobar' and ProducerIdAndEpoch(producerId=13131, epoch=1) cannot execute transactional method because of previous invalid state transition attempt java.lang.IllegalStateException: Producer with transactionalId 'foobar' and ProducerIdAndEpoch(producerId=13131, epoch=1) cannot execute transactional method because of previous invalid state transition attempt at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1112) at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:324) at org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBackgroundInvalidStateTransitionIsFatal(TransactionManagerTest.java:3422) . . . Caused by: java.lang.IllegalStateException: TransactionalId foobar: Invalid transition attempted from state READY to state ABORTABLE_ERROR at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1070) at org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:467) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:707) at org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:714) at org.apache.kafka.clients.producer.internals.TransactionManagerTest.lambda$testBackgroundInvalidStateTransitionIsFatal$139(TransactionManagerTest.java:3418) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3083) at org.apache.kafka.clients.producer.internals.TransactionManagerTest.testBackgroundInvalidStateTransitionIsFatal(TransactionManagerTest.java:3418) ... 84 more ``` Does that seem like sufficient context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1185525137 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3406,54 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testMakeIllegalTransitionFatal() { +doInitTransactions(); +assertTrue(transactionManager.isTransactional()); + +// Step 1: create a transaction. +transactionManager.beginTransaction(); +assertTrue(transactionManager.hasOngoingTransaction()); + +// Step 2: abort a transaction (wait for it to complete) and then verify that the transaction manager is +// left in the READY state. +TransactionalRequestResult abortResult = transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND); +runUntil(abortResult::isCompleted); +abortResult.await(); +assertTrue(abortResult.isSuccessful()); +assertFalse(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.isReady()); + +// Step 3: create a batch and simulate the Sender handling a failed batch, which would *attempt* to put +// the transaction manager in the ABORTABLE_ERROR state. However, that is an illegal state transition, so +// verify that it failed and caused the transaction manager to be put in an unrecoverable FATAL_ERROR state. +ProducerBatch batch = batchWithValue(tp0, "test"); +assertThrowsFatalStateException("handleFailedBatch", () -> transactionManager.handleFailedBatch(batch, new NetworkException("Disconnected from node 4"), false)); +assertTrue(transactionManager.hasFatalError()); + +// Step 4: validate that the transactions can't be started, committed +assertThrowsFatalStateException("beginTransaction", () -> transactionManager.beginTransaction()); +assertThrowsFatalStateException("beginAbort", () -> transactionManager.beginAbort(TransactionManager.InvalidStateDetectionStrategy.FOREGROUND)); +assertThrowsFatalStateException("beginCommit", () -> transactionManager.beginCommit()); +assertThrowsFatalStateException("maybeAddPartition", () -> transactionManager.maybeAddPartition(tp0)); +assertThrowsFatalStateException("initializeTransactions", () -> transactionManager.initializeTransactions()); +assertThrowsFatalStateException("sendOffsetsToTransaction", () -> transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("fake-group-id"))); +} + +private void assertThrowsFatalStateException(String methodName, Runnable operation) { +try { +operation.run(); +} catch (KafkaException t) { Review Comment: I added a check for `IllegalStateException` in `maybeFailWithError`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1185524662 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3405,6 +3406,52 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); } +@Test +public void testMakeIllegalTransitionFatal() { Review Comment: Added requested tests for foreground errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13640: URL: https://github.com/apache/kafka/pull/13640#issuecomment-1535394647 > With this PR I consistently get failures with the following tests Thanks for catching that. I was not setting the client ID to the correct value for the `KafkaAdminClient`'s `NetworkClient`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clayburn commented on a diff in pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
clayburn commented on code in PR #13676: URL: https://github.com/apache/kafka/pull/13676#discussion_r1185496911 ## build.gradle: ## @@ -39,7 +39,6 @@ plugins { id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.8" id "com.github.spotbugs" version '5.0.13' apply false - id 'org.gradle.test-retry' version '1.5.2' apply false Review Comment: The test-retry plugin is bundled with the Gradle Enterprise plugin. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clayburn opened a new pull request, #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
clayburn opened a new pull request, #13676: URL: https://github.com/apache/kafka/pull/13676 This PR publishes a build scan for every CI build on Jenkins and GitHub Actions and for every local build from an authenticated Apache committer. The build will not fail if publishing fails. The build scans of the Apache Kafka project are published to the Gradle Enterprise instance at [ge.apache.org](https://ge.apache.org/), hosted by the Apache Software Foundation and run in partnership between the ASF and Gradle. This Gradle Enterprise instance has all features and extensions enabled and is freely available for use by the Apache Kafka project and all other Apache projects. This pull request enhances the functionality of publishing build scans to the publicly available [scans.gradle.com](https://scans.gradle.com/) by instead publishing build scans to [ge.apache.org](https://ge.apache.org/). On this Gradle Enterprise instance, Apache Kafka will have access not only to all of the published build scans but other aggregate data features such as: - Dashboards to view all historical build scans, along with performance trends over time - Build failure analytics for enhanced investigation and diagnosis of build failures - Test failure analytics to better understand trends and causes around slow, failing, and flaky tests If interested in exploring a fully populated Gradle Enterprise instance, please explore the builds already connected to [ge.apache.org](https://ge.apache.org/), the [Spring project’s instance](https://ge.spring.io/scans?search.relativeStartTime=P28D&search.timeZoneId=Europe/Zurich), or any number of other [OSS projects](https://gradle.com/enterprise-customers/oss-projects/) for which we sponsor instances of Gradle Enterprise. Please let me know if there are any questions about the value of Gradle Enterprise or the changes in this pull request and I’d be happy to address them. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185475014 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +ass
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185460028 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilderTest.java: ## @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CurrentAssignmentBuilderTest { + +@Test +public void testTransitionFromNewTargetToRevoke() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 3, 4, 5), +mkTopicAssignment(topicId2, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.REVOKING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +assertEquals(10, updatedMember.memberEpoch()); +assertEquals(11, updatedMember.nextMemberEpoch()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 3), +mkTopicAssignment(topicId2, 6) +), updatedMember.assignedPartitions()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 1, 2), +mkTopicAssignment(topicId2, 4, 5) +), updatedMember.partitionsPendingRevocation()); +assertEquals(mkAssignment( +mkTopicAssignment(topicId1, 4, 5), +mkTopicAssignment(topicId2, 7, 8) +), updatedMember.partitionsPendingAssignment()); +} + +@Test +public void testTransitionFromNewTargetToAssigning() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setNextMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3), +mkTopicAssignment(topicId2, 4, 5, 6))) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.STABLE, member.state()); + +Assignment targetAssignment = new Assignment(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3, 4, 5), +mkTopicAssignment(topicId2, 4, 5, 6, 7, 8) +)); + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(11, targetAssignment) +.withCurrentPartitionEpoch((topicId, partitionId) -> 10) +.build(); + +assertEquals(ConsumerGroupMember.MemberState.ASSIGNING, updatedMember.state()); +assertEquals(10, updatedMember.previousMemberEpoch()); +ass
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1185446129 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used + * by the new group coordinator (KIP-848). + */ +public class GenericGroupMember { + +/** + * A builder allowing to create a new generic member or update an + * existing one. + * + * Please refer to the javadoc of {{@link GenericGroupMember}} for the + * definition of the fields. + */ +public static class Builder { +private final String memberId; +private Optional groupInstanceId = Optional.empty(); +private String clientId = ""; +private String clientHost = ""; +private int rebalanceTimeoutMs = -1; +private int sessionTimeoutMs = -1; +private String protocolType = ""; +private List supportedProtocols = Collections.emptyList(); +private byte[] assignment = new byte[0]; + +public Builder(String memberId) { +this.memberId = Objects.requireNonNull(memberId); +} + +public Builder(GenericGroupMember member) { +Objects.requireNonNull(member); + +this.memberId = member.memberId; +this.groupInstanceId = member.groupInstanceId; +this.rebalanceTimeoutMs = member.rebalanceTimeoutMs; +this.sessionTimeoutMs = member.sessionTimeoutMs; +this.clientId = member.clientId; +this.clientHost = member.clientHost; +this.protocolType = member.protocolType; +this.supportedProtocols = member.supportedProtocols; +this.assignment = member.assignment; +} + +public Builder setGroupInstanceId(Optional groupInstanceId) { +this.groupInstanceId = groupInstanceId; +return this; +} + +public Builder setClientId(String clientId) { +this.clientId = clientId; +return this; +} + +public Builder setClientHost(String clientHost) { +this.clientHost = clientHost; +return this; +} + +public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) { +this.rebalanceTimeoutMs = rebalanceTimeoutMs; +return this; +} + +public Builder setSessionTimeoutMs(int sessionTimeoutMs) { +this.sessionTimeoutMs = sessionTimeoutMs; +return this; +} + +public Builder setProtocolType(String protocolType) { +this.protocolType = protocolType; +return this; +} + +public Builder setSupportedProtocols(List protocols) { +this.supportedProtocols = protocols; +return this; +} + +public Builder setAssignment(byte[] assignment) { +this.assignment = assignment; +return this; +} + +public GenericGroupMember build() { +return new GenericGroupMember( +memberId, +groupInstanceId, +clientId, +clientHost, +rebalanceTimeoutMs, +sessionTimeoutMs, +protocolType, +supportedProtocols, +assignment +); +} +} + +private static class MemberSummary { Review Comment: got it. i'll remove it in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1185444113 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class encapsulates a generic group member's metadata. + * + * Member metadata contains the following: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + *is kept in metadata until the leader provides the group assignment + *and the group transitions to stable + */ +public class GenericGroupMember { + +private static class MemberSummary { +private final String memberId; +private final Optional groupInstanceId; +private final String clientId; +private final String clientHost; +private final byte[] metadata; +private final byte[] assignment; + +public MemberSummary(String memberId, + Optional groupInstanceId, + String clientId, + String clientHost, + byte[] metadata, + byte[] assignment) { + +this.memberId = memberId; +this.groupInstanceId = groupInstanceId; +this.clientId = clientId; +this.clientHost = clientHost; +this.metadata = metadata; +this.assignment = assignment; +} + +public String memberId() { +return memberId; +} + +public Optional getGroupInstanceId() { +return groupInstanceId; +} + +public String clientId() { +return clientId; +} + +public String clientHost() { +return clientHost; +} + +public byte[] metadata() { +return metadata; +} + +public byte[] assignment() { +return assignment; +} + +} + +/** + * The member id. + */ +private final String memberId; + +/** + * The group instance id. + */ +private final Optional groupInstanceId; + +/** + * The client id. + */ +private final String clientId; + +/** + * The client host. + */ +private final String clientHost; + +/** + * The rebalance timeout in milliseconds. + */ +private final int rebalanceTimeoutMs; + +/** + * The session timeout in milliseconds. + */ +private final int sessionTimeoutMs; + +/** + * The protocol type. + */ +private final String protocolType; + +/** + * The list of supported protocols. + */ +private final List supportedProtocols; + +/** + * The assignment stored by the client assignor. + */ +private final byte[] assignment; + +/** + * The callback that is invoked once this member joins the group. + */ +private CompletableFuture awaitingJoinCallback = null; + +/** + * The callback that is invoked once this member completes the sync group phase. + */ +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1185441081 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class encapsulates a generic group member's metadata. + * + * Member metadata contains the following: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + *is kept in metadata until the leader provides the group assignment + *and the group transitions to stable + */ +public class GenericGroupMember { + +private static class MemberSummary { +private final String memberId; +private final Optional groupInstanceId; +private final String clientId; +private final String clientHost; +private final byte[] metadata; +private final byte[] assignment; + +public MemberSummary(String memberId, + Optional groupInstanceId, + String clientId, + String clientHost, + byte[] metadata, + byte[] assignment) { + +this.memberId = memberId; +this.groupInstanceId = groupInstanceId; +this.clientId = clientId; +this.clientHost = clientHost; +this.metadata = metadata; +this.assignment = assignment; +} + +public String memberId() { +return memberId; +} + +public Optional getGroupInstanceId() { +return groupInstanceId; +} + +public String clientId() { +return clientId; +} + +public String clientHost() { +return clientHost; +} + +public byte[] metadata() { +return metadata; +} + +public byte[] assignment() { +return assignment; +} + +} + +/** + * The member id. + */ +private final String memberId; + +/** + * The group instance id. + */ +private final Optional groupInstanceId; + +/** + * The client id. + */ +private final String clientId; + +/** + * The client host. + */ +private final String clientHost; + +/** + * The rebalance timeout in milliseconds. + */ +private final int rebalanceTimeoutMs; + +/** + * The session timeout in milliseconds. + */ +private final int sessionTimeoutMs; + +/** + * The protocol type. + */ +private final String protocolType; + +/** + * The list of supported protocols. + */ +private final List supportedProtocols; + +/** + * The assignment stored by the client assignor. + */ +private final byte[] assignment; + +/** + * The callback that is invoked once this member joins the group. + */ +private CompletableFuture awaitingJoinCallback = null; + +/** + * The callback that is invoked once this member completes the sync group phase. + */ +
[GitHub] [kafka] hertzsprung commented on pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult
hertzsprung commented on PR #13671: URL: https://github.com/apache/kafka/pull/13671#issuecomment-1535283665 `:clients:test:` succeeded locally but fails in CI due to a gradle configuration problem: > Could not create task ':clients:test'. > > [2023-05-04T16:19:19.121Z] > The Gradle Enterprise Gradle plugin is conflicting with the Test Retry Gradle plugin and has already added a retry extension to the test task test. Please either remove the Test Retry Gradle plugin from this project or disable the registration of the retry extension in the Gradle Enterprise Gradle plugin by specifying the system property 'gradle.enterprise.testretry.enabled' and setting it to 'false'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13540: MINOR: improve QuorumController logging
cmccabe closed pull request #13540: MINOR: improve QuorumController logging URL: https://github.com/apache/kafka/pull/13540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
jeffkbkim commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1185332905 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class encapsulates a generic group member's metadata. + * + * Member metadata contains the following: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + *is kept in metadata until the leader provides the group assignment + *and the group transitions to stable + */ +public class GenericGroupMember { + +private static class MemberSummary { +private final String memberId; +private final Optional groupInstanceId; +private final String clientId; +private final String clientHost; +private final byte[] metadata; +private final byte[] assignment; + +public MemberSummary(String memberId, + Optional groupInstanceId, + String clientId, + String clientHost, + byte[] metadata, + byte[] assignment) { + +this.memberId = memberId; +this.groupInstanceId = groupInstanceId; +this.clientId = clientId; +this.clientHost = clientHost; +this.metadata = metadata; +this.assignment = assignment; +} + +public String memberId() { +return memberId; +} + +public Optional getGroupInstanceId() { +return groupInstanceId; +} + +public String clientId() { +return clientId; +} + +public String clientHost() { +return clientHost; +} + +public byte[] metadata() { +return metadata; +} + +public byte[] assignment() { +return assignment; +} + +} + +/** + * The member id. + */ +private final String memberId; + +/** + * The group instance id. + */ +private final Optional groupInstanceId; + +/** + * The client id. + */ +private final String clientId; Review Comment: i removed the final keyword and added a setter method in https://github.com/apache/kafka/pull/13663/files#diff-7ec3bedcac3e1a182a8aed96e4e40f0b5ac3295c90bbc862799b7b57e07f983dR494-R499 as it's only used by the group metadata. should i refactor it here or keep it as is and change it in https://github.com/apache/kafka/pull/13663? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compli
[jira] [Updated] (KAFKA-14958) Investigate enforcing all batches have the same producer ID
[ https://issues.apache.org/jira/browse/KAFKA-14958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-14958: --- Description: KAFKA-14916 was created after I incorrectly assumed transaction ID in the produce request indicated all batches were transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but we decided this can be done in a followup, as we still need to assess if we can enforce this without breaking workloads. We did enforce this for partitions we are verifying only in KAFKA-14916. This ticket is that followup. was: KAFKA-14916 was created after I incorrectly assumed transaction ID in the produce request indicated all batches were transactional. Originally this ticket had an action item to ensure all the producer IDs are the same in the batches since we send a single txn ID, but we decided this can be done in a followup, as we still need to assess if we can enforce this without breaking workloads. This ticket is that followup. > Investigate enforcing all batches have the same producer ID > --- > > Key: KAFKA-14958 > URL: https://issues.apache.org/jira/browse/KAFKA-14958 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Minor > > KAFKA-14916 was created after I incorrectly assumed transaction ID in the > produce request indicated all batches were transactional. > Originally this ticket had an action item to ensure all the producer IDs are > the same in the batches since we send a single txn ID, but we decided this > can be done in a followup, as we still need to assess if we can enforce this > without breaking workloads. We did enforce this for partitions we are > verifying only in KAFKA-14916. > This ticket is that followup. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14916) Fix code that assumes transactional ID implies all records are transactional
[ https://issues.apache.org/jira/browse/KAFKA-14916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-14916. Resolution: Fixed > Fix code that assumes transactional ID implies all records are transactional > > > Key: KAFKA-14916 > URL: https://issues.apache.org/jira/browse/KAFKA-14916 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.6.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > > KAFKA-14561 wrote code that assumed that if a transactional ID was included, > all record batches were transactional and had the same producer ID. > This work with improve validation and fix the code that assumes all batches > are transactional. > Further, KAFKA-14561 will not assume all records are transactional. > Originally this ticket had an action item to ensure all the producer IDs are > the same in the batches since we send a single txn ID, but that can be done > in a followup KAFKA-14958, as we still need to assess if we can enforce this > without breaking workloads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13668) Failed cluster authorization should not be fatal for producer
[ https://issues.apache.org/jira/browse/KAFKA-13668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-13668. Fix Version/s: 3.6.0 Resolution: Fixed > Failed cluster authorization should not be fatal for producer > - > > Key: KAFKA-13668 > URL: https://issues.apache.org/jira/browse/KAFKA-13668 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Philip Nee >Priority: Major > Fix For: 3.6.0 > > > The idempotent producer fails fatally if the initial `InitProducerId` returns > CLUSTER_AUTHORIZATION_FAILED. This makes the producer unusable until a new > instance is constructed. For some applications, it is more convenient to keep > the producer instance active and let the administrator fix the permission > problem instead of going into a crash loop. Additionally, most applications > will probably not be smart enough to reconstruct the producer instance, so if > the application does not handle the error by failing, users will have to > restart the application manually. > I think it would be better to let the producer retry the `InitProducerId` > request as long as the user keeps trying to use the producer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14624) State restoration is broken with standby tasks and cache-enabled stores in processor API
[ https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719425#comment-17719425 ] Lucas Brutschy commented on KAFKA-14624: This seems to be a duplicate of https://issues.apache.org/jira/browse/KAFKA-14172 A fix will be included in 3.5. [~balajirrao] can you attempt to reproduce the bug with latest master and confirm that it's fixed? > State restoration is broken with standby tasks and cache-enabled stores in > processor API > > > Key: KAFKA-14624 > URL: https://issues.apache.org/jira/browse/KAFKA-14624 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Balaji Rao >Priority: Major > > I found that cache-enabled state stores in PAPI with standby tasks sometimes > returns stale data when a partition moves from one app instance to another > and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a > small project that I used to reproduce the issue. > I dug around a bit and it seems like it's a bug in standby task state > restoration when caching is enabled. If a partition moves from instance 1 to > 2 and then back to instance 1, since the `CachingKeyValueStore` doesn't > register a restore callback, it can return potentially stale data for > non-dirty keys. > I could fix the issue by modifying the `CachingKeyValueStore` to register a > restore callback in which the cache restored keys are added to the cache. Is > this fix in the right direction? > {code:java} > // register the store > context.register( > root, > (RecordBatchingStateRestoreCallback) records -> { > for (final ConsumerRecord record : > records) { > put(Bytes.wrap(record.key()), record.value()); > } > } > ); > {code} > > I would like to contribute a fix, if I can get some help! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan merged pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan merged PR #13607: URL: https://github.com/apache/kafka/pull/13607 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
Hangleton commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1535077333 Agree that this change should be ideally confined to the Windows platform. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
cmccabe commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1535052942 I'm going to leave a -1 here because there are a number of issues in the patch and it's not ready to commit yet. I would encourage you to split this patch into several parts and clearly explain what issue each one solves and how. From the high level, the challenge for us is that Windows does things quite differently in some cases, but we don't want to degrade performance on all the other platforms. So a lot of thought is probably required here. Ideally other platforms would not be impacted at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan merged PR #12149: URL: https://github.com/apache/kafka/pull/12149 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185204551 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185195017 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public CurrentAssignmentBui
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185193708 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public CurrentAssignmentBui
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185188619 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185172881 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch - The current epoch of the member. + * - Next Epoch - The desired epoch of the member. It corresponds to the epoch of + *the target/desired assignment. The member transitions to this epoch + *when it has revoked the partitions that it does not own or if it + *does not have to revoke any. + * - Previous Epoch - The previous epoch of the member when the state was updated. + * - Assigned Set - The set of partitions currently assigned to the member. This represents what + *the member should have. + * - Revoking Set - The set of partitions that the member should revoke before it can transition + *to the next state. + * - Assigning Set - The set of partitions that the member will eventually receive. The partitions + *in this set are still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT - This is the initial state of the state machine. The state machine starts + * here when the next epoch does not match the target epoch. It means that + * a new target assignment has been installed so the reconciliation process + * must restart. In this state, the Assigned, Revoking and Assigning sets + * are computed. If Revoking is not empty, the state machine transitions + * to REVOKE; if Assigning is not empty, it transitions to ASSIGNING; + * otherwise it transitions to STABLE. + * - REVOKE- This state means that the member must revoke partitions before it can + * transition to the next epoch and thus start receiving new partitions. + * The member transitions to the next state only when it has acknowledged + * the revocation. + * - ASSIGNING - This state means that the member waits on partitions which are still + * owned by other members in the group. It remains in this state until + * they are all freed up. + * - STABLE- This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/**
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jeffkbkim commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1185168753 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The previous epoch of the member when the state was updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: + * This is the initial state of the state machine. The state machine starts here when the next epoch + * does not match the target epoch. It means that a new target assignment has been installed so the + * reconciliation process must restart. In this state, the Assigned, Revoking and Assigning sets are + * computed. If Revoking is not empty, the state machine transitions to REVOKING; if Assigning is not + * empty, it transitions to ASSIGNING; otherwise it transitions to STABLE. + * + * - REVOKING: + * This state means that the member must revoke partitions before it can transition to the next epoch + * and thus start receiving new partitions. This is to guarantee that offsets of revoked partitions + * are committed with the current epoch. The member transitions to the next state only when it has + * acknowledged the revocation. + * + * - ASSIGNING: + * This state means that the member waits on partitions which are still owned by other members in the + * group. It remains in this state until they are all freed up. + * + * - STABLE: + * This state means that the member has received all its assigned partitions. + */ +public class CurrentAssignmentBuilder { +/** + * The consumer group member which is reconciled. + */ +private final ConsumerGroupMember member; + +/** + * The target assignment epoch. + */ +private int targetAssignmentEpoch; + +/** + * The target assignment. + */ +private Assignment targetAssignment; + +/** + * A function which returns the current epoch of a topic-partition or -1 if the + * topic-partition is not assigned. The current epoch is the epoch of the current owner. + */ +private BiFunction currentPartitionEpoch; + +/** + * The partitions owned by the consumer. This is directly provided by the member in the + * ConsumerGroupHeartbeat request. + */ +private List ownedTopicPartitions; + +/** + * Constructs the CurrentAssignmentBuilder based on the current state of the + * provided consumer group member. + * + * @param member The consumer group member that must be reconciled. + */ +public CurrentAssignmen
[GitHub] [kafka] clolov commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
clolov commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1185137613 ## core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala: ## @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.server.KafkaApisTest.{NameAndId, newOffsetCommitRequestData, newOffsetCommitResponseData} +import kafka.server.{BaseRequestTest, KafkaConfig} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.OffsetCommitRequestTest.assertResponseEquals +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetCommitResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} + +import java.util.Optional.empty +import java.util.Properties +import scala.collection.{Map, Seq} +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, SeqHasAsJava} + +class OffsetCommitRequestTest extends BaseRequestTest { + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val groupId = "groupId" + + var consumer: KafkaConsumer[_, _] = _ + + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) +properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") +properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + } + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { +super.setUp(testInfo) +consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) +consumer = createConsumer() + } + + @AfterEach + override def tearDown(): Unit = { +if (consumer != null) + Utils.closeQuietly(consumer, "KafkaConsumer") +super.tearDown() + } + + def createTopics(topicNames: String*): Seq[NameAndId] = { +topicNames.map(topic => { + createTopic(topic) + val topicId: Uuid = getTopicIds().get(topic) match { +case Some(x) => x +case _ => throw new AssertionError("Topic ID not found for " + topic) + } + NameAndId(topic, topicId) +}) + } + + + @Test + def testTopicIdsArePopulatedInOffsetCommitResponses(): Unit = { +val topics = createTopics("topic1", "topic2", "topic3") Review Comment: Sorry, could you elaborate, because I am not certain I follow? `getTopicIds(...)` will return a map but only if the topics requested have been created first. Are you suggesting that since all tests create these three topics we move the creation to the setup method and then we use `getTopicIds` everywhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185104750 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] lucasbru commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
lucasbru commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1185039728 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>()); Review Comment: In the state updater code path, we would immediately go back to processing / committing the active (previously existing) tasks, so I am not sure the problem being solved here may not be relevant in the state updater code. But I may be missing something. In that case, we should probably only execute the commit if the state updater code path is not enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
dajac commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r118500 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class encapsulates a generic group member's metadata. + * + * Member metadata contains the following: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + *is kept in metadata until the leader provides the group assignment + *and the group transitions to stable + */ +public class GenericGroupMember { + +private static class MemberSummary { +private final String memberId; +private final Optional groupInstanceId; +private final String clientId; +private final String clientHost; +private final byte[] metadata; +private final byte[] assignment; + +public MemberSummary(String memberId, + Optional groupInstanceId, + String clientId, + String clientHost, + byte[] metadata, + byte[] assignment) { + +this.memberId = memberId; +this.groupInstanceId = groupInstanceId; +this.clientId = clientId; +this.clientHost = clientHost; +this.metadata = metadata; +this.assignment = assignment; +} + +public String memberId() { +return memberId; +} + +public Optional getGroupInstanceId() { +return groupInstanceId; +} + +public String clientId() { +return clientId; +} + +public String clientHost() { +return clientHost; +} + +public byte[] metadata() { +return metadata; +} + +public byte[] assignment() { +return assignment; +} + +} + +/** + * The member id. + */ +private final String memberId; + +/** + * The group instance id. + */ +private final Optional groupInstanceId; + +/** + * The client id. + */ +private final String clientId; + +/** + * The client host. + */ +private final String clientHost; + +/** + * The rebalance timeout in milliseconds. + */ +private final int rebalanceTimeoutMs; + +/** + * The session timeout in milliseconds. + */ +private final int sessionTimeoutMs; + +/** + * The protocol type. + */ +private final String protocolType; + +/** + * The list of supported protocols. + */ +private final List supportedProtocols; + +/** + * The assignment stored by the client assignor. + */ +private final byte[] assignment; + +/** + * The callback that is invoked once this member joins the group. + */ +private CompletableFuture awaitingJoinCallback = null; Review Comment: nit: Should we call this one `awaitingJoinCallbackFuture`? ## group-coordinator/s
[GitHub] [kafka] clolov commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9
clolov commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1185002319 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -129,16 +150,19 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public abstract class ConsumerCoordinatorTest { -private final String topic1 = "test1"; -private final String topic2 = "test2"; -private final TopicPartition t1p = new TopicPartition(topic1, 0); -private final TopicPartition t2p = new TopicPartition(topic2, 0); -private final String groupId = "test-group"; +private static String topic1 = "test1"; +private static String topic2 = "test2"; +private static TopicPartition t1p = new TopicPartition(topic1, 0); +private static TopicIdPartition ti1p = new TopicIdPartition(Uuid.randomUuid(), t1p); Review Comment: I believe t1p is abstracted because it is being used in 175 other places in this test class for test setup and assertions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)
machi1990 commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534757023 > > In relation to dependency upgrade, has there been any discussion around automated tooling e.g usage of dependabot or renovate? > > I don't know. I have seen @ijuma being the one who periodically performs dependency upgrades. He may be able to provide more info about this. Thanks, I'll be interested in any details that could be provided @ijuma > Dependabot is a good idea (and some other Apache communities use it), except when it leads to noise. I don't know if there is a way to "mute" it and enable only at the beginning of a release cycle. Yes, it is possible. With dependabot you can limit the number of PRs opened. Setting the limit to `0` will equate disabling depedency update for a given package ecosystem. Renovate has a disabling flag, which could be used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13673: MINOR: Update dependencies (minor versions only)
divijvaidya commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534729885 > In relation to dependency upgrade, has there been any discussion around automated tooling e.g usage of dependabot or renovate? I don't know. I have seen @ijuma being the one who periodically performs dependency upgrades. He may be able to provide more info about this. Dependabot is a good idea (and some other Apache communities use it), except when it leads to noise. I don't know if there is a way to "mute" it and enable only at the beginning of a release cycle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13675: KAFKA-14462; [14/N]; Add PartitionWriter
dajac opened a new pull request, #13675: URL: https://github.com/apache/kafka/pull/13675 This patch introduces the `PartitionWriter` interface in the `group-coordinator` module. The `ReplicaManager` resides in the `core` module and it is thus not accessible from the `group-coordinator` one. The `PartitionWriterImpl` is basically an implementation of the interface residing in `core` which interfaces with the `ReplicaManager`. One notable difference from the usual produce path is that the `PartitionWriter` returns the offset following the written records. This is then used by the coordinator runtime (coming later) to track when the request associated with the write can be completed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13673: MINOR: Update dependencies (minor versions only)
divijvaidya commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534725676 Thanks for your comment @machi1990. In principle what you say is right but given the limited committer bandwidth in the community, I am trying to optimize for code reviewer comfort right now. That is why I have intentionally added only the non-controversial upgrades here in the PR. In case of a need for rollback, we can always choose to roll-forward by modifying the version of a specific dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)
machi1990 commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534706839 In relation to dependency upgrade, has there been any discussion around automated tooling e.g usage of dependabot or renovate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13673: MINOR: Update dependencies (minor versions only)
machi1990 commented on PR #13673: URL: https://github.com/apache/kafka/pull/13673#issuecomment-1534704953 Thanks @divijvaidya I am wondering whether it is best to separate each upgrade on a separate PR? That makes each dependency update atomic and thus easier to revert in case we notice issue related to a specific dependency upgrade. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #13674: MINOR: Release resources in FetcherTest
divijvaidya opened a new pull request, #13674: URL: https://github.com/apache/kafka/pull/13674 FetcherTest fails intermittently with OOM since it doesn't close the spied resources properly. ``` org.gradle.api.internal.tasks.testing.TestSuiteExecutionException: Could not complete execution for Gradle Test Executor 1565. at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:64) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvertByEditor(MethodHandleImpl.java:318) at java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvert(MethodHandleImpl.java:262) at java.base/java.lang.invoke.MethodHandleImpl.makePairwiseConvert(MethodHandleImpl.java:379) at java.base/java.lang.invoke.MethodHandle.asTypeUncached(MethodHandle.java:885) at java.base/java.lang.invoke.MethodHandle.asType(MethodHandle.java:869) at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.isUnavailable(ConsumerNetworkClient.java:559) at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:461) at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:72) at org.apache.kafka.clients.consumer.internals.FetcherTest.sendFetches(FetcherTest.java:236) at org.apache.kafka.clients.consumer.internals.FetcherTest.testFetcherConcurrency(FetcherTest.java:2945) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #13673: MINOR: Update dependencies (minor versions only)
divijvaidya opened a new pull request, #13673: URL: https://github.com/apache/kafka/pull/13673 All dependency upgrades in the PR are minor upgrades with backward compatible changes. Note that no major version for dependencies have been changed to make it a low risk change (and possibly merge to 3.5). There are separate PRs such as https://github.com/apache/kafka/pull/13662 which will upgrade the major versions. ## Release notes for dependencies: ### bcpkix Release notes: https://www.bouncycastle.org/releasenotes.html#r1rv73 ### httpclient Release notes: https://downloads.apache.org/httpcomponents/httpclient/RELEASE_NOTES-4.5.x.txt ### jackson and jackson-databind Release notes: https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.13.5 ### jacoco Release notes: https://github.com/jacoco/jacoco/releases ### javaassist Release notes: https://github.com/jboss-javassist/javassist/releases ### jetty Release notes: https://github.com/eclipse/jetty.project/releases?q=9.4.51&expanded=true ### jersey Release notes: https://github.com/eclipse-ee4j/jersey/releases ### jline Release notes: https://github.com/jline/jline3/releases ### jaxb Can't find release notes! But https://mvnrepository.com/artifact/javax.xml.bind/jaxb-api/2.3.1 is the latest version in maven. ### junit Release notes: https://junit.org/junit5/docs/current/release-notes/index.html#release-notes-5.9.3 ### mockito Release notes: https://github.com/mockito/mockito/releases?q=v4.11.0&expanded=true ### netty Release notes: https://netty.io/news/2023/04/25/4-1-92-Final.html ### reload4j Release notes: https://reload4j.qos.ch/ ### scalaCollectionCompat Release notes: https://github.com/scala/scala-collection-compat/releases?q=2.10.0&expanded=true ### scoverage Release notes: https://github.com/scoverage/sbt-scoverage/releases?page=2 ## Compatibility - Verified build and test with JDK8 and JDK 17 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable
[ https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719288#comment-17719288 ] Sergey Ivanov commented on KAFKA-14817: --- [~showuon], thank you for explanation! At the moment we "fixed" the issues with uncleaned segments with setting temporary "delete" policy on problem environment, and no more uncleaned partitions. But I'll try to find the same issue on other non-sensitive environments and If find I attach dump. > LogCleaner mark some partitions of __consumer_offsets as uncleanable > > > Key: KAFKA-14817 > URL: https://issues.apache.org/jira/browse/KAFKA-14817 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.1 >Reporter: ZhenChun Pan >Priority: Major > > We find some patitions of topic __consumer_offsets can't retention it's log > any more and takes up a lot of disk space. Then we found these patitions of > topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The > logs below: > [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner) > [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting > (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log > __consumer_offsets-24. (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for > __consumer_offsets-24... (kafka.log.LogCleaner) > [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log > __consumer_offsets-24 for 5 segments in offset range [0, 2360519). > (kafka.log.LogCleaner) > [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from > 262144 bytes to 524288 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from > 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from > 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from > 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from > 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from > 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log > Log(dir=/opt/kafka-service/data/__consumer_offsets-24, > topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, > logStartOffset=0, logEndOffset=2759760). Marking its partition > (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner) > kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but > not processed for log segment > /opt/kafka-service/data/__consumer_offsets-24/.log at > position 31457091 > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356) > at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > Caused by: java.lang.IllegalStateException: Batch size 223 < buffer size > 10485772, but not processed for log segment > /opt/kafka-service/data/__consumer_offsets-24/.log at > position 31457091 > at kafka.log.Cleaner.growBuffersOrFail(LogCleaner.scala:745) > at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:983) > at kafka.log.Cleaner.$anonfun$buildOffsetMap$5(LogCleaner.scala:908) > at > kafka.log.Cleaner.$anonfun$buildOffsetMap$5$adapted(LogCleaner.scala:904) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984) > at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:904) > at kafka.log.Cleaner.doClean(LogCleaner.scala:523) > at kafka.log.Cleaner.clean(LogCleaner.scala:511) > at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:380) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:352) > ... 3 more > [2023-03-17 17:54:02,477] INFO Cleaner 0: Beginning cl
[jira] [Comment Edited] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option
[ https://issues.apache.org/jira/browse/KAFKA-13392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719274#comment-17719274 ] Jonas Lundholm Bertelsen edited comment on KAFKA-13392 at 5/4/23 11:12 AM: --- We have seen this same issue with a Kafka 3.3 cluster where one broker is down and we want to reassign replicas away from it. The `–zookeeper` argument is no longer available, so we were not able to use that workaround. An alternative workaround is to simply omit the throttle parameter. This obviously has its own drawbacks. In our case we split the reassignment file into smaller parts and applied them one after the other to try and put a "manual throttle" on activity spikes. was (Author: JIRAUSER300177): We have seen this same issue with a Kafka 3.4 cluster where one broker is down and we want to reassign replicas away from it. The `–zookeeper` argument is no longer available, so we were not able to use that workaround. An alternative workaround is to simply omit the throttle parameter. This obviously has its own drawbacks. In our case we split the reassignment file into smaller parts and applied them one after the other to try and put a "manual throttle" on activity spikes. > Timeout Exception triggering reassign partitions with --bootstrap-server > option > --- > > Key: KAFKA-13392 > URL: https://issues.apache.org/jira/browse/KAFKA-13392 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0 >Reporter: Yevgeniy Korin >Priority: Minor > > *Scenario when we faced with this issue:* > One of three brokers is down. Add another (fourth) broker and try to > reassign partitions using '--bootstrap-server' > option. > *What's failed:* > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server > xxx.xxx.xxx.xxx:9092 --reassignment-json-file > /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} > failed with > {code:java} > Error: org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435) > at > kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412) > at > kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974) > at > kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255) > at > kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216) > at > kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out > waiting for a node assignment. Call: incrementalAlterConfigs{code} > *Expected behavio**:* > partition reassignment process started. > *Workaround:* > Trigger partition reassignment process using '--zookeeper' option: > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper > zookeeper.my.company:2181/kafka-cluster --reassignment-json-file > /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} > *Additional info:* > We are able to trigger partition reassignment using '--bootstrap-server' > option with no exceptions when all four brokers are alive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1184873142 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -670,6 +875,14 @@ public void close() { } catch (InterruptedException e) { // ignore } +remoteStorageReaderThreadPool.shutdownNow(); +//waits for 2 mins to terminate the current tasks +try { +remoteStorageReaderThreadPool.awaitTermination(2, TimeUnit.MINUTES); Review Comment: That code `lifecycleManager.controlledShutdownFuture` is more about processing the controlled shutdown event to the controller for that broker. It will wait for 5 mins before proceeding with other sequence of actions. But that will not get affected because of the code introduced here. Logging subsystem handles unclean shutdown for log segments and it would have been already finished before RemoteLogManager is closed. So, they will not get affected because of this timeout. But we can have a short duration here like 10 secs, we can revisit introducing a config if it is really needed for closing the remote log subsystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13392) Timeout Exception triggering reassign partitions with --bootstrap-server option
[ https://issues.apache.org/jira/browse/KAFKA-13392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719274#comment-17719274 ] Jonas Lundholm Bertelsen commented on KAFKA-13392: -- We have seen this same issue with a Kafka 3.4 cluster where one broker is down and we want to reassign replicas away from it. The `–zookeeper` argument is no longer available, so we were not able to use that workaround. An alternative workaround is to simply omit the throttle parameter. This obviously has its own drawbacks. In our case we split the reassignment file into smaller parts and applied them one after the other to try and put a "manual throttle" on activity spikes. > Timeout Exception triggering reassign partitions with --bootstrap-server > option > --- > > Key: KAFKA-13392 > URL: https://issues.apache.org/jira/browse/KAFKA-13392 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0 >Reporter: Yevgeniy Korin >Priority: Minor > > *Scenario when we faced with this issue:* > One of three brokers is down. Add another (fourth) broker and try to > reassign partitions using '--bootstrap-server' > option. > *What's failed:* > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server > xxx.xxx.xxx.xxx:9092 --reassignment-json-file > /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} > failed with > {code:java} > Error: org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ReassignPartitionsCommand$.modifyInterBrokerThrottle(ReassignPartitionsCommand.scala:1435) > at > kafka.admin.ReassignPartitionsCommand$.modifyReassignmentThrottle(ReassignPartitionsCommand.scala:1412) > at > kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:974) > at > kafka.admin.ReassignPartitionsCommand$.handleAction(ReassignPartitionsCommand.scala:255) > at > kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:216) > at > kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) > Caused by: org.apache.kafka.common.errors.TimeoutException: > Call(callName=incrementalAlterConfigs, deadlineMs=1634811369255, tries=1, > nextAllowedTryMs=1634811369356) timed out at 1634811369256 after 1 attempt(s) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out > waiting for a node assignment. Call: incrementalAlterConfigs{code} > *Expected behavio**:* > partition reassignment process started. > *Workaround:* > Trigger partition reassignment process using '--zookeeper' option: > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper > zookeeper.my.company:2181/kafka-cluster --reassignment-json-file > /tmp/reassignment-20211021130718.json --throttle 1 --execute{code} > *Additional info:* > We are able to trigger partition reassignment using '--bootstrap-server' > option with no exceptions when all four brokers are alive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya opened a new pull request, #13672: MINOR: Print the cause of failure for PlaintextAdminIntegrationTest
divijvaidya opened a new pull request, #13672: URL: https://github.com/apache/kafka/pull/13672 # Motivation PlaintextAdminIntegrationTest fails in a flaky manner with the follow trace (e.g. in [this build](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13670/3/tests/)): ``` org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertFalse.failNotFalse(AssertFalse.java:63) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:36) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:31) at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:228) at kafka.api.PlaintextAdminIntegrationTest.testElectUncleanLeadersForOnePartition(PlaintextAdminIntegrationTest.scala:1583) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ``` The std output doesn't contain useful information that we could use to debug the cause of failure. This is because the test, currently, validates if there is an exception and fails when one is present. It does not print what the exception is. # Change 1. Make the test a bit more robust by waiting for server startup. 2. Fail the test with the actual unexpected exception which will help in debugging the cause of failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hertzsprung opened a new pull request, #13671: KAFKA-14967: fix NPE in CreateTopicsResult
hertzsprung opened a new pull request, #13671: URL: https://github.com/apache/kafka/pull/13671 instead of passing `null` to the future, pass a populated `TopicAndMetadataConfig` `MockAdminClient` has no tests that I'm aware of. The contribution is my original work and that I license the work to the project under the project's open source license. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1184835821 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[jira] [Comment Edited] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable
[ https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719250#comment-17719250 ] Luke Chen edited comment on KAFKA-14817 at 5/4/23 10:11 AM: [~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error means we cannot recognize the batch in the log position (maybe corrupted), so that we cannot proceed with compaction for this partition logs. Like pointed out in this article [https://luppeng.wordpress.com/2022/08/21/possible-reasons-why-a-kafka-topic-is-not-being-compacted/] (point 3), letting Kafka do log recovery by itself should be able to fix it. About how we can fix this permanently, I have to think about it. But before that, if it's possible to help verify if the log segments appeared in the log is indeed corrupted? You can verify it by running this command to dump the uncleanable logs?: {code:java} ./bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/quickstart-events-0/.log{code} Of course, if it's allowed, it'll be better you upload the uncleanable logs for me to investigation. But I can understand if it's not permitted. Thanks. was (Author: showuon): [~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error means we cannot recognize the batch in the log position (maybe corrupted), so that we cannot proceed with compaction for this partition logs. Like [~mrMigles] pointed out, letting Kafka do log recovery by itself should be able to fix it. About how we can fix this permanently, I have to think about it. But before that, if it's possible to help verify if the log segments appeared in the log is indeed corrupted? You can verify it by running this command to dump the uncleanable logs?: {code:java} ./bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/quickstart-events-0/.log{code} Of course, if it's allowed, it'll be better you upload the uncleanable logs for me to investigation. But I can understand if it's not permitted. Thanks. > LogCleaner mark some partitions of __consumer_offsets as uncleanable > > > Key: KAFKA-14817 > URL: https://issues.apache.org/jira/browse/KAFKA-14817 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.1 >Reporter: ZhenChun Pan >Priority: Major > > We find some patitions of topic __consumer_offsets can't retention it's log > any more and takes up a lot of disk space. Then we found these patitions of > topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The > logs below: > [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner) > [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting > (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log > __consumer_offsets-24. (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for > __consumer_offsets-24... (kafka.log.LogCleaner) > [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log > __consumer_offsets-24 for 5 segments in offset range [0, 2360519). > (kafka.log.LogCleaner) > [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from > 262144 bytes to 524288 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from > 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from > 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from > 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from > 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from > 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log > Log(dir=/opt/kafka-service/data/__consumer_offsets-24, > topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, > logStartOffset=0, logEndOffset=2759760). Marking its partition > (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner) > kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but > not processed for log segment > /opt/kafka-service/data/__consumer_offsets-24/.log at > position 31457091 > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356) > at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332) >
[jira] [Commented] (KAFKA-14817) LogCleaner mark some partitions of __consumer_offsets as uncleanable
[ https://issues.apache.org/jira/browse/KAFKA-14817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719250#comment-17719250 ] Luke Chen commented on KAFKA-14817: --- [~polaris.alioth] [~mrMigles] , thanks for reporting this issue. The error means we cannot recognize the batch in the log position (maybe corrupted), so that we cannot proceed with compaction for this partition logs. Like [~mrMigles] pointed out, letting Kafka do log recovery by itself should be able to fix it. About how we can fix this permanently, I have to think about it. But before that, if it's possible to help verify if the log segments appeared in the log is indeed corrupted? You can verify it by running this command to dump the uncleanable logs?: {code:java} ./bin/kafka-dump-log.sh --files /tmp/kraft-combined-logs/quickstart-events-0/.log{code} Of course, if it's allowed, it'll be better you upload the uncleanable logs for me to investigation. But I can understand if it's not permitted. Thanks. > LogCleaner mark some partitions of __consumer_offsets as uncleanable > > > Key: KAFKA-14817 > URL: https://issues.apache.org/jira/browse/KAFKA-14817 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.1 >Reporter: ZhenChun Pan >Priority: Major > > We find some patitions of topic __consumer_offsets can't retention it's log > any more and takes up a lot of disk space. Then we found these patitions of > topic __consumer_offsets have to mark as uncleanable in log-cleaner.log. The > logs below: > [2023-03-17 17:53:46,655] INFO Starting the log cleaner (kafka.log.LogCleaner) > [2023-03-17 17:53:46,770] INFO [kafka-log-cleaner-thread-0]: Starting > (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Beginning cleaning of log > __consumer_offsets-24. (kafka.log.LogCleaner) > [2023-03-17 17:53:46,841] INFO Cleaner 0: Building offset map for > __consumer_offsets-24... (kafka.log.LogCleaner) > [2023-03-17 17:53:47,013] INFO Cleaner 0: Building offset map for log > __consumer_offsets-24 for 5 segments in offset range [0, 2360519). > (kafka.log.LogCleaner) > [2023-03-17 17:53:47,394] INFO Cleaner 0: Growing cleaner I/O buffers from > 262144 bytes to 524288 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,395] INFO Cleaner 0: Growing cleaner I/O buffers from > 524288 bytes to 1048576 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,396] INFO Cleaner 0: Growing cleaner I/O buffers from > 1048576 bytes to 2097152 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,401] INFO Cleaner 0: Growing cleaner I/O buffers from > 2097152 bytes to 4194304 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,409] INFO Cleaner 0: Growing cleaner I/O buffers from > 4194304 bytes to 8388608 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,434] INFO Cleaner 0: Growing cleaner I/O buffers from > 8388608 bytes to 10485772 bytes. (kafka.log.LogCleaner) > [2023-03-17 17:53:47,465] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log > Log(dir=/opt/kafka-service/data/__consumer_offsets-24, > topic=__consumer_offsets, partition=24, highWatermark=0, lastStableOffset=0, > logStartOffset=0, logEndOffset=2759760). Marking its partition > (__consumer_offsets-24) as uncleanable (kafka.log.LogCleaner) > kafka.log.LogCleaningException: Batch size 223 < buffer size 10485772, but > not processed for log segment > /opt/kafka-service/data/__consumer_offsets-24/.log at > position 31457091 > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:356) > at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:332) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:321) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > Caused by: java.lang.IllegalStateException: Batch size 223 < buffer size > 10485772, but not processed for log segment > /opt/kafka-service/data/__consumer_offsets-24/.log at > position 31457091 > at kafka.log.Cleaner.growBuffersOrFail(LogCleaner.scala:745) > at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:983) > at kafka.log.Cleaner.$anonfun$buildOffsetMap$5(LogCleaner.scala:908) > at > kafka.log.Cleaner.$anonfun$buildOffsetMap$5$adapted(LogCleaner.scala:904) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.s
[GitHub] [kafka] mimaison merged pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
mimaison merged PR #13122: URL: https://github.com/apache/kafka/pull/13122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13659: MINOR: add docs to remind reader that impl of ConsumerPartitionAssign…
chia7712 commented on PR #13659: URL: https://github.com/apache/kafka/pull/13659#issuecomment-1534435900 @kirktrue could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
divijvaidya commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1184786130 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) Review Comment: Your suggestion will work (for now) until all metrics in this class of type `gauge` but later if someone introduces a metric of another type and adds it to MetricNames, it will fail. Nevertheless, I have made the change as you suggested. The test could be changed later when someone adds a new metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sandybis commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
sandybis commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1534414079 @MPeli I found AccessDenied exception again when a topic is deleted. This causes a crash same way it used to while deleting message. Could the same fix be applied to delete topic as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1184761649 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult
[ https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Shaw updated KAFKA-14967: --- Description: Calling {{CreateTopicsResult.topicId().get()}} throws {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the topicId. The NPE appears to be caused by [{{MockAdminClient.createTopics()}} calling {{future.complete(null)}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L394] Stacktrace: {code:java} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) [snip] Caused by: java.lang.NullPointerException at org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) at org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58) at org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82) ... 85 more {code} Test case to reproduce: {code:java} import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.ExecutionException; import static java.util.Collections.singletonList; public class MockAdminClientBug { @Test void shouldNotThrowNullPointerException() throws ExecutionException, InterruptedException { Node controller = new Node(0, "mock", 0); try (Admin admin = new MockAdminClient(singletonList(controller), controller)) { CreateTopicsResult result = admin.createTopics(singletonList(new NewTopic("TestTopic", Optional.empty(), Optional.empty(; Uuid topicId = result.topicId("TestTopic").get(); System.out.println(topicId); } } } {code} was: Calling {{CreateTopicsResult.topicId().get()}} throws {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the topicId. Stacktrace: {code:java} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) [snip] Caused by: java.lang.NullPointerException at org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) at org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58) at org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82) ... 85 more {code} Test case to reproduce: {code:java} import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.ExecutionException; import static java.util.Collections.singletonList; public class MockAdminClientBug { @Test void shouldNotThrowNullPointerException() throws ExecutionException, InterruptedException { Node controller = new Node(0, "mock", 0); try (Admin admin = new MockAdminClient(singletonList(controller), controller)) { CreateTopicsResult result = admin.createTopics(singletonList(new NewTopic("TestTopic", Optional.empty(), Optional.empty(;
[GitHub] [kafka] showuon commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1184756806 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult
[ https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Shaw updated KAFKA-14967: --- Description: Calling {{CreateTopicsResult.topicId().get()}} throws {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the topicId. Stacktrace: {code:java} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) [snip] Caused by: java.lang.NullPointerException at org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) at org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58) at org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82) ... 85 more {code} Test case to reproduce: {code:java} import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.ExecutionException; import static java.util.Collections.singletonList; public class MockAdminClientBug { @Test void shouldNotThrowNullPointerException() throws ExecutionException, InterruptedException { Node controller = new Node(0, "mock", 0); try (Admin admin = new MockAdminClient(singletonList(controller), controller)) { CreateTopicsResult result = admin.createTopics(singletonList(new NewTopic("TestTopic", Optional.empty(), Optional.empty(; Uuid topicId = result.topicId("TestTopic").get(); System.out.println(topicId); } } } {code} was: Calling {{CreateTopicsResult.topicId().get()}} throws {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the topicId. Stacktrace: {code:java} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(Invocat
[jira] [Updated] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult
[ https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Shaw updated KAFKA-14967: --- Description: Calling {{CreateTopicsResult.topicId().get()}} throws {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns the topicId. Stacktrace: {code:java} java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(N
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[jira] [Created] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult
James Shaw created KAFKA-14967: -- Summary: MockAdminClient throws NullPointerException in CreateTopicsResult Key: KAFKA-14967 URL: https://issues.apache.org/jira/browse/KAFKA-14967 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: James Shaw Calling {{CreateTopicsResult.topicId().get()}} throws {{NullPointerException}}, while {{KafkaAdminClient}} correctly returns the topicId. Test case to reproduce: {code:java} import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.ExecutionException; import static java.util.Collections.singletonList; public class MockAdminClientBug { @Test void shouldNotThrowNullPointerException() throws ExecutionException, InterruptedException { Node controller = new Node(0, "mock", 0); try (Admin admin = new MockAdminClient(singletonList(controller), controller)) { CreateTopicsResult result = admin.createTopics(singletonList(new NewTopic("TestTopic", Optional.empty(), Optional.empty(; Uuid topicId = result.topicId("TestTopic").get(); System.out.println(topicId); } } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13669: MINOR: Fix producer Callback comment
fvaleri commented on code in PR #13669: URL: https://github.com/apache/kafka/pull/13669#discussion_r1184732376 ## clients/src/main/java/org/apache/kafka/clients/producer/Callback.java: ## @@ -36,7 +36,7 @@ public interface Callback { * Non-Retriable exceptions (fatal, the message will never be sent): * * InvalidTopicException - * OffsetMetadataTooLargeException + * OffsetMetadataTooLarge Review Comment: Hi @machi1990, that's good idea. I think what you propose would help in preventing errors like this. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
dajac commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1184723565 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -62,6 +65,39 @@ class LogCleanerTest { Utils.delete(tmpdir) } + @Test + def testRemoveMetricsOnClose(): Unit = { +val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) +try { + val logCleaner = new LogCleaner(new CleanerConfig(true), +logDirs = Array(TestUtils.tempDir()), +logs = new Pool[TopicPartition, UnifiedLog](), +logDirFailureChannel = new LogDirFailureChannel(1), +time = time) + + // shutdown logCleaner so that metrics are removed + logCleaner.shutdown() + + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0) + val numMetricsRegistered = 5 + verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) Review Comment: That makes sense. Thanks for the clarification. I wonder if we could use `LogCleaner.MetricNames.size` instead of hardcoding `5`. I suppose that it would have a similar semantic, does it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on PR #13666: URL: https://github.com/apache/kafka/pull/13666#issuecomment-1534317488 @kirktrue Thanks for your comments. I have addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1184718825 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread(threadPrefix + threadId) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends ShutdownableThread { +EventProcessorThread( +String name +) { +super(name, false); +} + +@Override +public void doWork() { +while (!shuttingDown) { Review Comment: It is not required but I agree that it is confusing. I think that I will just use a regular `Thread` because it better fits my need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1184688955 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java: ## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +/** + * The base event type used by all events processed in the + * coordinator runtime. + */ +public interface CoordinatorEvent extends EventAccumulator.Event { +/** + * Runs the event. + */ +void run(); Review Comment: Yeah, I was debating this as well. My conclusion was that `Runnable` is not needed anywhere. I mean that we never downcast the event to `Runnable`. Therefore I kept it simple and just defined the interface that I need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor
dajac commented on code in PR #13666: URL: https://github.com/apache/kafka/pull/13666#discussion_r1184687089 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; + +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link EventAccumulator}} + * which guarantees that events sharing a partition key are not processed concurrently. + */ +public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + +/** + * The logger. + */ +private final Logger log; + +/** + * The accumulator. + */ +private final EventAccumulator accumulator; + +/** + * The processing threads. + */ +private final List threads; + +/** + * The lock for protecting access to the resources. + */ +private final ReentrantLock lock; + +/** + * A boolean indicated whether the event processor is shutting down. + */ +private volatile boolean shuttingDown; + +/** + * Constructor. + * + * @param logContextThe log context. + * @param threadPrefix The thread prefix. + * @param numThreadsThe number of threads. + */ +public MultiThreadedEventProcessor( +LogContext logContext, +String threadPrefix, +int numThreads +) { +this.log = logContext.logger(MultiThreadedEventProcessor.class); +this.shuttingDown = false; +this.lock = new ReentrantLock(); +this.accumulator = new EventAccumulator<>(); +this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> +new EventProcessorThread(threadPrefix + threadId) +).collect(Collectors.toList()); +this.threads.forEach(EventProcessorThread::start); +} + +/** + * The event processor thread. The thread pulls events from the + * accumulator and runs them. + */ +class EventProcessorThread extends ShutdownableThread { +EventProcessorThread( +String name +) { +super(name, false); +} + +@Override +public void doWork() { +while (!shuttingDown) { +CoordinatorEvent event = accumulator.poll(); +if (event == null) continue; + +try { +log.debug("Executing event " + event); +event.run(); +} catch (Throwable t) { +log.error("Failed to run event " + event + " due to: " + t, t); Review Comment: Nope. It is just a bad habit, I suppose. Let me change them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org