[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
fvaleri commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1286654127 ## tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java: ## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.PartitionFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter; +import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GetOffsetShell { +private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?"); + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println("Error occurred: " + e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println("Error occurred: " + e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException { +GetOffsetShell getOffsetShell = new GetOffsetShell(); + +GetOffsetShellOptions options = new GetOffsetShellOptions(args); + +Map partitionOffsets = getOffsetShell.fetchOffsets(options); + +for (Map.Entry entry : partitionOffsets.entrySet()) { +TopicPartition topic = entry.getKey(); + +System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()})); +} +} + +private static class GetOffsetShellOptions extends CommandDefaultOptions { +private final OptionSpec brokerListOpt; +private final OptionSpec bootstrapServerOpt; +private final OptionSpec topicPartitionsOpt; +private final OptionSpec topicOpt; +private final OptionSpec partitionsOpt; +private final OptionSpec timeOpt; +private final OptionSpec commandConfigOpt; +private final OptionSpec effectiveBrokerListOpt; +private final OptionSpecBuilder excludeInternalTopicsOpt; + +public GetOffsetShellOptions(String[] args) throws TerseException { +super(args); + +brokerListOpt = parser.accepts("broker-list
[GitHub] [kafka] abhijeetk88 commented on pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on PR #14127: URL: https://github.com/apache/kafka/pull/14127#issuecomment-1668965018 Thanks for the review @showuon. I have addressed your comments and responded to your questions. Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286634284 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); +
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633888 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); +
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633719 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); +
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632651 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); +
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632214 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); Revi
[GitHub] [kafka] alok123t commented on a diff in pull request #14162: KAFKA-15312; Force channel before atomic file move
alok123t commented on code in PR #14162: URL: https://github.com/apache/kafka/pull/14162#discussion_r1286557850 ## raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java: ## @@ -112,6 +112,9 @@ public void freeze() { checkIfFrozen("Freeze"); frozenSize = channel.size(); +// force the channel to write to the file system before closing, this guarantees that the file has the data +// on disk before preforming the atomic file move Review Comment: typo: performing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751878#comment-17751878 ] Guozhang Wang commented on KAFKA-15259: --- Got it, KAFKA-15309 makes sense. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > exactly_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using exactly_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with exactly_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of exactly_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-Stre
[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomonari Yamashita updated KAFKA-15259: --- Description: [Problem] - Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once. -- "CONTINUE will signal that Streams should ignore the issue and continue processing"(1), so Kafka Streams should continue processing even if using exactly_once when ProductionExceptionHandlerResponse.CONTINUE used. -- However, if using exactly_once, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) [Environment] - Kafka Streams 3.5.1 [Reproduction procedure] # Create "input-topic" topic and "output-topic" # Put several messages on "input-topic" # Execute a simple Kafka streams program that transfers too large messages from "input-topic" to "output-topic" with exactly_once and returns ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the producer. Please refer to the reproducer program (attached file: Reproducer.java). # ==> However, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to the debug log (attached file: app_exactly_once.log). ## My excepted behavior is that Kafka Streams should continue processing even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE used. [As far as my investigation] - FYI, if using at_least_once instead of exactly_once, Kafka Streams continue processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the debug log (attached file: app_at_least_once.log). - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs. (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler - [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] (2) Transaction abort and shutdown occur {code:java} 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Exception occurred during message send: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] stream-task [0_0] Error encountered sending record to topic output-topic for task 0_0 due to: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded. org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transiting to abortable error state due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION to error state ABORTABLE_ERROR org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Processed 1 records with 1 iterations; invoking punctuators if necessary 2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 punctuators ran. 2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Committing all active tasks [0_0] an
[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomonari Yamashita updated KAFKA-15259: --- Summary: Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once (was: Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once) > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using exactly_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger t
[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomonari Yamashita updated KAFKA-15259: --- Description: [Problem] - Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once. -- "CONTINUE will signal that Streams should ignore the issue and continue processing"(1), so Kafka Streams should continue processing even if using exactly_once when ProductionExceptionHandlerResponse.CONTINUE used. -- However, if using execute_once, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) [Environment] - Kafka Streams 3.5.1 [Reproduction procedure] # Create "input-topic" topic and "output-topic" # Put several messages on "input-topic" # Execute a simple Kafka streams program that transfers too large messages from "input-topic" to "output-topic" with exactly_once and returns ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the producer. Please refer to the reproducer program (attached file: Reproducer.java). # ==> However, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to the debug log (attached file: app_exactly_once.log). ## My excepted behavior is that Kafka Streams should continue processing even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE used. [As far as my investigation] - FYI, if using at_least_once instead of execute_once, Kafka Streams continue processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the debug log (attached file: app_at_least_once.log). - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs. (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler - [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] (2) Transaction abort and shutdown occur {code:java} 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Exception occurred during message send: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] stream-task [0_0] Error encountered sending record to topic output-topic for task 0_0 due to: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded. org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transiting to abortable error state due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION to error state ABORTABLE_ERROR org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Processed 1 records with 1 iterations; invoking punctuators if necessary 2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 punctuators ran. 2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Committing all active tasks [0_0] an
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751862#comment-17751862 ] Tomonari Yamashita commented on KAFKA-15259: Thank you for creating the new ticket for Kafka Producer: https://issues.apache.org/jira/browse/KAFKA-15309. I think that similar problems occur not only with Kafka Streams but also with Kafka Producer, so I agree that it needs to be resolved first in the Kafka Producer ticket. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka.common.errors.RecordTooLargeException: The message is
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751836#comment-17751836 ] Matthias J. Sax commented on KAFKA-15302: - {quote}it's a combo of a cache snapshot and the underlying RocksDB's {quote} I think this is actually the bug: we actually don't return a snapshot over the cache. If it would be a snapshot, flushing the cache would not modify it, but it does as you pointed out: {quote}hat modifying the store for `keyA` might change the content of `KeyB` compared with the snapshot {quote} I am totally in favor of "decouple flushing with forwarding" thought, independent of this ticket. But that's a larger piece of work anyway, and I am also not sure if we would want to make such a change in-place, or via DSL 2.0? {quote}letting any range queries to flush cache first, and then only return from the underlying store. {quote} That's an interesting idea. – A naive fix would be, to actually make a "shallow copy (with copy on write)" of the named cached when opening an iterator, to guard the shallow copy for evection. But this is also hard to get right and could be very memory intensive... In case we cannot provide a fix easily, updating the docs is for sure something we should do. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751835#comment-17751835 ] Matthias J. Sax commented on KAFKA-15259: - Ah. Sorry. Meant https://issues.apache.org/jira/browse/KAFKA-15309 (sorry for the c&p error – fixed in the above two comments). {quote}KS could tell KP to not transit to errors for any `abortable` error type but just ignore and continue? {quote} That's not so easy. Currently, the producer goes into ERROR state, so we need to actually close the producer and would need to create a new one, including falling back to last committed offset and retry the failed transactions. If the error is "deterministic" (like a `RercordTooLargeException`) we would just hit it again on re-try, and would need to do more book keeping to actually avoid hitting the error again. That's why it seems simpler to actually skip the error inside the producer (-> https://issues.apache.org/jira/browse/KAFKA-15309) to avoid that the producer goes into ERROR state to begin with. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of
[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759 ] Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:59 PM: -- {quote}Should we create a new ticket for Kafka Producer? {quote} As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If it's not covering what you think we need, just leave a comment on the ticket. was (Author: mjsax): {quote}Should we create a new ticket for Kafka Producer? {quote} As mentioned above, I created -https://issues.apache.org/jira/browse/KAFKA-15259- https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If it's not covering what you think we need, just leave a comment on the ticket. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.requ
[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759 ] Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM: -- {quote}Should we create a new ticket for Kafka Producer? {quote} As mentioned above, I created -https://issues.apache.org/jira/browse/KAFKA-15259- https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If it's not covering what you think we need, just leave a comment on the ticket. was (Author: mjsax): {quote}Should we create a new ticket for Kafka Producer? {quote} As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 for the producer already. If it's not covering what you think we need, just leave a comment on the ticket. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.requ
[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751267#comment-17751267 ] Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM: -- I did sync up with [~cegerton] who worked on https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this idea: adding a "production exception handler" to the producer that would allow KS to tell the producer to not fail the TX but skip the record: https://issues.apache.org/jira/browse/KAFKA-15309. If we cannot do K15259, an alternative might be, to add an internal producer config that allow Kafka Streams to disable the pro-active abort of a TX. This would be safe, because Kafka Streams is actually a good citizen and calls `producer.flush()` and evaluates all callbacks before trying to commit – the issue K9279 addresses is actually bad user behavior to not check for async errors before committing. was (Author: mjsax): I did sync up with [~cegerton] who worked on https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this idea: adding a "production exception handler" to the producer that would allow KS to tell the producer to not fail the TX but skip the record: -https://issues.apache.org/jira/browse/KAFKA-15259- https://issues.apache.org/jira/browse/KAFKA-15309. If we cannot do K15259, an alternative might be, to add an internal producer config that allow Kafka Streams to disable the pro-active abort of a TX. This would be safe, because Kafka Streams is actually a good citizen and calls `producer.flush()` and evaluates all callbacks before trying to commit – the issue K9279 addresses is actually bad user behavior to not check for async errors before committing. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred dur
[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751267#comment-17751267 ] Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM: -- I did sync up with [~cegerton] who worked on https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this idea: adding a "production exception handler" to the producer that would allow KS to tell the producer to not fail the TX but skip the record: -https://issues.apache.org/jira/browse/KAFKA-15259- https://issues.apache.org/jira/browse/KAFKA-15309. If we cannot do K15259, an alternative might be, to add an internal producer config that allow Kafka Streams to disable the pro-active abort of a TX. This would be safe, because Kafka Streams is actually a good citizen and calls `producer.flush()` and evaluates all callbacks before trying to commit – the issue K9279 addresses is actually bad user behavior to not check for async errors before committing. was (Author: mjsax): I did sync up with [~cegerton] who worked on https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this idea: adding a "production exception handler" to the producer that would allow KS to tell the producer to not fail the TX but skip the record: https://issues.apache.org/jira/browse/KAFKA-15259 If we cannot do K15259, an alternative might be, to add an internal producer config that allow Kafka Streams to disable the pro-active abort of a TX. This would be safe, because Kafka Streams is actually a good citizen and calls `producer.flush()` and evaluates all callbacks before trying to commit – the issue K9279 addresses is actually bad user behavior to not check for async errors before committing. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred dur
[jira] [Commented] (KAFKA-15197) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
[ https://issues.apache.org/jira/browse/KAFKA-15197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751833#comment-17751833 ] Greg Harris commented on KAFKA-15197: - [~divijvaidya] I apologize for the flakiness of the test, It's a lot worse than I expected. I've opened a patch that should fix the bug and stabilize the test that we can hopefully get merged this week or next. > Flaky test > MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() > -- > > Key: KAFKA-15197 > URL: https://issues.apache.org/jira/browse/KAFKA-15197 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Reporter: Divij Vaidya >Priority: Major > Labels: flaky-test > Fix For: 3.6.0 > > > As of Jul 17th, this is the second most flaky test in our CI on trunk and > fails 46% of times. > See: > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe/Berlin] > > Note that MirrorConnectorsIntegrationExactlyOnceTest has multiple tests but > testOffsetTranslationBehindReplicationFlow is the one that is the reason for > most failures. see: > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe/Berlin&tests.container=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest] > > > Reason for failure is: > |org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets for consumer group consumer-group-lagging-behind not translated from > primary for topic primary.test-topic-1 ==> expected: but was: | -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
gharris1727 commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286472665 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. Review Comment: I've rewritten this to make the previous paragraph more terse, and to explain a bit more in this paragraph about why the service_load strategy is faster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751830#comment-17751830 ] Guozhang Wang commented on KAFKA-15302: --- I think case I'm not sure if this is defined as a bug that should be fixed in KS: the semantics of `store.all()` is to return a snapshot (behind the scene, it's a combo of a cache snapshot and the underlying RocksDB's snapshot) at the time of the call, and if the store gets modified after the `all()` call, we do not guarantee the entries from the `all()` iterator would return consistent results as the `get()` which is meant to return the latest value. What's confusing though is that the users may not think that modifying the store for `keyA` might change the content of `KeyB` compared with the snapshot --- as what's happened here due to evictions --- but I think unless we change how we execute the caching layer's put/delete calls and how they can trigger eviction (of other keys), this would always be the case. For now the only thing we can do probably is to clarify it in the docs. In the long run, if we feel such a confusion is really un-intuitive, we could consider 1) decouple flushing with forwarding, and then 2) letting any range queries to flush cache first, and then only return from the underlying store. > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [951
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
gharris1727 commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286461778 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. + +Once you see that all incompatible plugins are included in the listing, you can proceed to dry-run the migration with sync-manifests --dry-run. This will perform all parts of the migration, except for writing the results of the migration to disk. Note that the sync-manifests command requires all specified paths to be writable, and may alter the contents of the directories. Make a backup of the specified paths, or copy them to a writable directory. + +Ensure that you have a backup and the dry-run succeeds before removing the --dry-run flag and actually running the migration. If the migration fails without the --dry-run flag, then the partially migrated artifacts should be discarded. The migration is idempotent, so running it multiple times and on already-migrated plugins is safe. After the migration is completed, you should v
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
gharris1727 commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286460752 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. + +Once you see that all incompatible plugins are included in the listing, you can proceed to dry-run the migration with sync-manifests --dry-run. This will perform all parts of the migration, except for writing the results of the migration to disk. Note that the sync-manifests command requires all specified paths to be writable, and may alter the contents of the directories. Make a backup of the specified paths, or copy them to a writable directory. Review Comment: I added a "take note" mention to the verification section, and a "Be sure to compare" mention here as a call back. It also meshes well with the special case for the classpath plugins mentioned immediately before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specifi
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751824#comment-17751824 ] Guozhang Wang commented on KAFKA-15259: --- [~mjsax] Did you mean a different ticket other than KAFKA-15259 (which is this ticket)? BTW I think after the change you proposed, we could still improve the KS' layer of handling abortable and fatal txn errors from underlying producers on top of what's summarized in KIP-691: if people agree that when KS's exception is configured as CONTINUE, then `RecordTooLargeException` etc (maybe all exceptions causing `abortable` rather than `fatal` errors as in KIP-691), then KS could tell KP to not transit to errors for any `abortable` error type but just ignore and continue? > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO Trans
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
gharris1727 commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286456172 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. Review Comment: > Hmmm... I agree with the reservations about modifying JAR files in-flight. Do we have to actually write to those JAR files, though? Could we create/modify a service loader manifest on the file system instead of inside a JAR file? > I know this is a little inelegant but IMO it's worth considering since it would reduce the potential for footguns related to classpath plugins and would increase the general utility of the CLI tool. If we did this, we'd end up with the same problem one iteration later. Suppose we were able to find the kafka lib directory, and knew that we were running via `kafka-run-class.sh`, so the manifest shim jar would be picked up the next time the script is run. Because it's on the classpath, we wouldn't be able to mutate it safely, so we could only append manifests by adding new shim jar
[GitHub] [kafka] guozhangwang commented on a diff in pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks
guozhangwang commented on code in PR #14149: URL: https://github.com/apache/kafka/pull/14149#discussion_r1286454991 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -618,18 +618,24 @@ private boolean assignTasksToClients(final Cluster fullMetadata, final boolean lagComputationSuccessful = populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics); -log.info("{} members participating in this rebalance: \n{}.", -clientStates.size(), -clientStates.entrySet().stream() -.sorted(comparingByKey()) -.map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) -.collect(Collectors.joining(Utils.NL))); + +log.info("{} client nodes and {} consumers participating in this rebalance: \n{}.", + clientStates.size(), + clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum), + clientStates.entrySet().stream() + .sorted(comparingByKey()) + .map(entry -> entry.getKey() + ": " + entry.getValue().consumers()) + .collect(Collectors.joining(Utils.NL))); final Set allTasks = partitionsForTask.keySet(); statefulTasks.addAll(changelogTopics.statefulTaskIds()); -log.debug("Assigning tasks {} including stateful {} to clients {} with number of replicas {}", -allTasks, statefulTasks, clientStates, numStandbyReplicas()); +log.info("Assigning stateful tasks: {}\n" + + "and stateless tasks: {}", + statefulTasks, + allTasks.removeAll(statefulTasks)); Review Comment: Won't this change the content of `allTasks`? Also `removeAll` returns a boolean. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -1063,9 +1071,13 @@ private Map> buildStandbyTaskMap(final String consum } for (final TaskId task : revokedTasks) { -if (allStatefulTasks.contains(task)) { Review Comment: Not sure I fully understand the change here either. Would appreciate some more explanations --- specifically, could you give a simple example in the PR description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
gharris1727 commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286456172 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. + +Verifying Plugin Compatibility + +To verify if all of your plugins are compatible, first ensure that you are using version 3.6 or later of the Connect runtime. You can then perform one of the following checks: + + +Start your worker with the default HYBRID_WARNstrategy, and WARN logs enabled for the org.apache.kafka.connect package. At least one WARN log message mentioning the plugin.discovery configuration should be printed. This log message will explicitly say that all plugins are compatible, or list the incompatible plugins. +Start your worker in a test environment with HYBRID_FAIL. If all plugins are compatible, startup will succeed. If at least one plugin is not compatible the worker will fail to start up, and all incompatible plugins will be listed in the exception. + + +If the verification step succeeds, then your current set of installed plugins are compatible, and it should be safe to change the plugin.discovery configuration to SERVICE_LOAD. If you change the set of already-installed plugins, they may no longer be compatible, and you should repeat the above verification. If the verification fails, you must address the incompatible plugins before using the SERVICE_LOAD strategy. + +Operators: Artifact Migration + +As an operator of Connect, if you discover incompatible plugins, there are multiple ways to try to resolve the incompatibility. They are listed below from most to least preferable. + + +Upgrade your incompatible plugins to the latest release version from your plugin provider. +Contact your plugin provider and request that they migrate the plugin to be compatible, following the source migration instructions, and then upgrade to the migrated version. +Migrate the plugin artifacts yourself using the included migration script. + + +The migration script is located in bin/connect-plugin-path.sh and bin\windows\connect-plugin-path.bat of your Kafka installation. The script can migrate incompatible plugin artifacts already installed on your Connect worker's plugin.path by adding or modifying JAR or resource files. This is not suitable for environments using code-signing, as this may change the artifacts such that they will fail signature verification. View the built-in help with --help. + +To perform a migration, first use the list subcommand to get an overview of the plugins available to the script. You must tell the script where to find plugins, which can be done with the repeatable --worker-config, --plugin-path, and --plugin-location arguments. The script will only migrate plugins present in the paths specified, so if you add plugins to your worker's classpath, then you will need to specify those plugins via one or more --plugin-location arguments. Review Comment: > Hmmm... I agree with the reservations about modifying JAR files in-flight. Do we have to actually write to those JAR files, though? Could we create/modify a service loader manifest on the file system instead of inside a JAR file? > I know this is a little inelegant but IMO it's worth considering since it would reduce the potential for footguns related to classpath plugins and would increase the general utility of the CLI tool. If we did this, we'd end up with the same problem one iteration later. Suppose we were able to find the kafka lib directory, and knew that we were running via `kafka-run-class.sh`, so the manifest shim jar would be picked up the next time the script is run. Because it's on the classpath, we wouldn't be able to mutate it safely, so we could only append manifests by adding new shim jar
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751821#comment-17751821 ] Guozhang Wang commented on KAFKA-15297: --- I think this is indeed a general issue, that state stores are initialized in the order of the topology which is essentially the "processor node order", as in ``InternalTopologyBuilder#build``. This works when a state store is only associated with one processors, or when a store is associated with multiple processors but they are built as part of a built-in operator (like a join in DSL) in which case we carefully make sure that state stores order is adherent with the processors order; but in a PAPI scenario like Bruno reported in this one, all bets are off. I think a general fix would be, in the above ``build`` function, we only build the processors in the first loop pass without initializing the state stores, and then based on the built processors order the state stores to be initialized, and do that in another pass. > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology, waiting for a commit, and looking for the following log > message: > https://github.com/apache/kaf
[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
lihaosky commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1286439107 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java: ## @@ -56,52 +67,118 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(Parameterized.class) public class HighAvailabilityTaskAssignorTest { -private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( -/*acceptableRecoveryLag*/ 100L, -/*maxWarmupReplicas*/ 2, -/*numStandbyReplicas*/ 0, -/*probingRebalanceIntervalMs*/ 60 * 1000L, -/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS -); - -private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( -/*acceptableRecoveryLag*/ 100L, -/*maxWarmupReplicas*/ 2, -/*numStandbyReplicas*/ 1, -/*probingRebalanceIntervalMs*/ 60 * 1000L, -/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS -); +private AssignmentConfigs getConfigWithoutStandbys() { +return new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L, +/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, +null, +null, +rackAwareStrategy +); +} + +private AssignmentConfigs getConfigWithStandbys() { +return getConfigWithStandbys(1); +} + +private AssignmentConfigs getConfigWithStandbys(final int replicaNum) { +return new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ replicaNum, +/*probingRebalanceIntervalMs*/ 60 * 1000L, +/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, +null, +null, +rackAwareStrategy +); +} + +@Parameter +public boolean enableRackAwareTaskAssignor; + +private String rackAwareStrategy = StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE; + +@Before +public void setUp() { +if (enableRackAwareTaskAssignor) { +
[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
lihaosky commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1286438784 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -124,11 +133,20 @@ private static void assignActiveStatefulTasks(final SortedMap ClientState::assignActive, (source, destination) -> true ); + +if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { Review Comment: I think currently it's because there are tests not modified and still passes null. I can remove this in my next PR after I modified all `HAAssignor` tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15281) Implement the groupMetadata Consumer API
[ https://issues.apache.org/jira/browse/KAFKA-15281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15281: - Assignee: (was: Kirk True) > Implement the groupMetadata Consumer API > > > Key: KAFKA-15281 > URL: https://issues.apache.org/jira/browse/KAFKA-15281 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Original Estimate: 504h > Remaining Estimate: 504h > > The threading refactor project needs to implement the {{groupMetadata()}} API > call once support for the KIP-848 protocol is implemented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15282) Implement client support for KIP-848 client-side assignors
[ https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15282: - Assignee: (was: Kirk True) > Implement client support for KIP-848 client-side assignors > -- > > Key: KAFKA-15282 > URL: https://issues.apache.org/jira/browse/KAFKA-15282 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > The client-side assignor provides the logic for the partition assignments > instead of on the server. Client-side assignment is the main approach used by > the “old protocol” for divvying up partitions. While the “new protocol” > favors server-side assignors, the client-side assignor will continue to be > used for backward compatibility, including KSQL, Connect, etc. > Note: I _*think*_ that the client-side assignor logic and the reconciliation > logic can remain separate from each other. We should strive to keep the two > pieces unencumbered, unless it’s unavoidable. > This task includes: > * Validate the client’s configuration for assignor selection > * Integrate with the new {{PartitionAssignor}} interface to invoke the logic > from the user-provided assignor implementation > * Implement the necessary logic around the request/response from the > {{ConsumerGroupPrepareAssignment}} RPC call using the information from the > {{PartitionAssignor}} above > * Implement the necessary logic around the request/response from the > {{ConsumerGroupInstallAssignment}} RPC call, again using the information > calculated by the {{PartitionAssignor}} > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15280) Implement client support for KIP-848 server-side assignors
[ https://issues.apache.org/jira/browse/KAFKA-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15280: - Assignee: (was: Kirk True) > Implement client support for KIP-848 server-side assignors > -- > > Key: KAFKA-15280 > URL: https://issues.apache.org/jira/browse/KAFKA-15280 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > This includes: > * Validate the client’s configuration for assignor selection > * Validate the request/response from the {{ConsumerGroupHeartbeat}} RPC call > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15284: - Assignee: (was: Kirk True) > Implement ConsumerGroupProtocolVersionResolver to determine consumer group > protocol > --- > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the—deep > breath—{{{}ConsumerGroupProtocolVersionResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fallback path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15279) Implement client support for KIP-848 assignment RPCs
[ https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15279: - Assignee: (was: Kirk True) > Implement client support for KIP-848 assignment RPCs > > > Key: KAFKA-15279 > URL: https://issues.apache.org/jira/browse/KAFKA-15279 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > The protocol introduces three new RPCs that the client uses to communicate > with the broker: > # > [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] > # > [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI] > # > [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI] > Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to > implement the ConsumerGroupAssignmentRequestManager to handle the second and > third RPCs on the above list. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15278: - Assignee: (was: Kirk True) > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > The protocol introduces three new RPCs that the client uses to communicate > with the broker: > # > [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] > # > [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI] > # > [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI] > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
[ https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15306: -- Labels: consumer-threading-refactor (was: client consumer) > Integrate committed offsets logic when updating fetching positions > -- > > Key: KAFKA-15306 > URL: https://issues.apache.org/jira/browse/KAFKA-15306 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > Integrate refreshCommittedOffsets logic, currently performed by the > coordinator, into the update fetch positions performed on every iteration of > the async consumer poll loop. This should rely on the CommitRequestManager to > perform the request based on the refactored model, but it should reuse the > logic for processing the committed offsets and updating the positions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing
[ https://issues.apache.org/jira/browse/KAFKA-15304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15304: -- Labels: consumer-threading-refactor (was: ) > CompletableApplicationEvents aren't being completed when the consumer is > closing > > > Key: KAFKA-15304 > URL: https://issues.apache.org/jira/browse/KAFKA-15304 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > If the background thread is closed before ingesting all ApplicationEvents, we > should drain the background queue and try to cancel these events before > closing. We can try to process these events before closing down the consumer; > however, we assume that when the user issues a close command, the consumer > should be shut down promptly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
[ https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15306: -- Labels: client consumer (was: client consumer kip-945) > Integrate committed offsets logic when updating fetching positions > -- > > Key: KAFKA-15306 > URL: https://issues.apache.org/jira/browse/KAFKA-15306 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client, consumer > > Integrate refreshCommittedOffsets logic, currently performed by the > coordinator, into the update fetch positions performed on every iteration of > the async consumer poll loop. This should rely on the CommitRequestManager to > perform the request based on the refactored model, but it should reuse the > logic for processing the committed offsets and updating the positions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15305: -- Component/s: clients > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15305: -- Labels: consumer-threading-refactor (was: ) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing
[ https://issues.apache.org/jira/browse/KAFKA-15304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15304: -- Component/s: clients > CompletableApplicationEvents aren't being completed when the consumer is > closing > > > Key: KAFKA-15304 > URL: https://issues.apache.org/jira/browse/KAFKA-15304 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > > If the background thread is closed before ingesting all ApplicationEvents, we > should drain the background queue and try to cancel these events before > closing. We can try to process these events before closing down the consumer; > however, we assume that when the user issues a close command, the consumer > should be shut down promptly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
[ https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15306: -- Component/s: clients consumer > Integrate committed offsets logic when updating fetching positions > -- > > Key: KAFKA-15306 > URL: https://issues.apache.org/jira/browse/KAFKA-15306 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client, consumer > > Integrate refreshCommittedOffsets logic, currently performed by the > coordinator, into the update fetch positions performed on every iteration of > the async consumer poll loop. This should rely on the CommitRequestManager to > perform the request based on the refactored model, but it should reuse the > logic for processing the committed offsets and updating the positions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #14163: MINOR: Improve JavaDocs of KafkaStreams `context.commit()`
mjsax commented on code in PR #14163: URL: https://github.com/apache/kafka/pull/14163#discussion_r1286429053 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java: ## @@ -805,12 +805,14 @@ static Map> getTopologyGroupTaskMap() { return Collections.singletonMap(SUBTOPOLOGY_0, Collections.singleton(new TaskId(1, 1))); } -static void verifyStandbySatisfyRackReplica(final Set taskIds, +static void verifyStandbySatisfyRackReplica( +final Set taskIds, Review Comment: Unrelated side cleanup to fix formatting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
kirktrue commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1286379164 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -77,17 +77,33 @@ public class CommonClientConfigs { public static final String DEFAULT_CLIENT_RACK = ""; public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; -public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker."; +public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. " + +"This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. " + +"This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the reconnect.backoff.max.ms value."; public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms"; -public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms."; +public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. " + +"If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms."; public static final String RETRIES_CONFIG = "retries"; public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." + " It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request."; public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; -public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios."; +public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. " + +"This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, " + +"up to the retry.backoff.max.ms value."; +public static final Long DEFAULT_RETRY_BACKOFF_MS = 100L; + +public static final String RETRY_BACKOFF_MAX_MS_CONFIG = "retry.backoff.max.ms"; +public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. " + +"If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, " + +"a randomized jitter with a factor of 0.2 will be applied to the backoff, resulting in the backoff falling within a range between 20% below and 20% above the computed value. " + +"If retry.backoff.ms is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant backoff from the beginning without any exponential increase"; Review Comment: Interesting... ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java: ## @@ -372,10 +372,16 @@ private static ConfigDef config(Crypto crypto) { CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC) .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, -100L, +CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS, atLeast(0L), ConfigDef.Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) +.define(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG, Review Comment: Does `DistributedConfig` need to call that `CommonClientConfigs.warnDisablingExponentialBackoff()` method li
[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
gharris1727 commented on PR #14064: URL: https://github.com/apache/kafka/pull/14064#issuecomment-1668643267 I've also added a new warning to stdout which alerts users when they have incompatible plugins on the classpath that will not be auto-migrated. Currently without any manifests in the runtime (they're in a different PR) this is what it looks like: ``` 46 plugins on the classpath are not compatible, and will not be auto-migrated. Please move these plugins to a plugin path location. org.apache.kafka.connect.transforms.ReplaceField$Value org.apache.kafka.connect.converters.FloatConverter org.apache.kafka.connect.converters.ShortConverter org.apache.kafka.connect.transforms.predicates.TopicNameMatches Total plugins: 1 Loadable plugins: 1 Compatible plugins: 1 Total locations:1 Compatible locations: 1 All locations compatible? true ``` They aren't part of the main table, and they aren't part of the summary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
gharris1727 commented on PR #14064: URL: https://github.com/apache/kafka/pull/14064#issuecomment-1668640505 > I think our workaround with a special path of classpath to denote classpath plugins in PluginUtils::classpathPluginSource should be improved. I built the basic auth extension, created a directory named classpath, moved the JAR file for the extension into that directory, and then ran bin/connect-plugin-path.sh list --plugin-location classpath. The end result was fairly confusing: Thanks for finding that, I've changed it so that PluginSource uses `null` as a sentinel value for the classpath, and changed all of the places it's printed to re-inject the `"classpath"` string after all the control flow has used `null`. I've replaced the makeUnique hack with Lists and counting instead of trying to make the ManifestEntry objects completely unique. > I've also finally gotten around to looking at the tests. It seems like we have a great foundation of scenarios to build on, but the assertions we're making are fairly minor. Can we add some coverage to verify, e.g., how many plugin rows are listed, the (partial or complete) contents of the post-table summary, and which plugins are described as being migrated vs. non-migrated? Yes the assertions are pretty sparse. I've been avoiding parsing the output while it was still in flux, but I think the format is starting to settle down now. I'll go through and see what I can add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
gharris1727 commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1286420767 ## tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java: ## @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + +private static final String MANIFEST_PREFIX = "META-INF/services/"; +private static final Object[] LIST_TABLE_COLUMNS = { +"pluginName", +"firstAlias", +"secondAlias", +"pluginVersion", +"pluginType", +"isLoadable", +"hasManifest", +"pluginLocation" // last because it is least important and most repetitive +}; + +public static void main(String[] args) { +Exit.exit(mainNoExit(args, System.out, System.err)); +} + +public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { +ArgumentParser parser = parser(); +try { +Namespace namespace = parser.parseArgs(args); +Config config = parseConfig(parser, namespace, out); +runCommand(config); +return 0; +} catch (ArgumentParserException e) { +parser.handleError(e); +return 1; +} catch (TerseException e) { +err.println(e.getMessage()); +return 2; +} catch (Throwable e) { +err.println(e.getMessage()); +err.println(Utils.stackTrace(e)); +return 3; +} +} + +private static ArgumentParser parser() { +ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") +.defaultHelp(true) +.description("Manage plugins on the Connect plugin.path"); + +ArgumentParser listCommand = parser.addSubparsers() +.description("List information about plugins contained within the specified plugin locations") +.dest("subcommand") +.addParser("list"); + +ArgumentParser[] subparsers = new ArgumentParser[] { +listCommand, +}; + +for (ArgumentParser subparser : subparsers) { +ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin providers"); +pluginPro
[GitHub] [kafka] jsancio opened a new pull request, #14162: KAFKA-15312; Force channel before atomic file move
jsancio opened a new pull request, #14162: URL: https://github.com/apache/kafka/pull/14162 On ext4 file systems we have seen snapshots with zero-length files. This is possible if the file is closed and moved before forcing the channel to write to disk. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Description: Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc ( * ), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force that any delayed allocation blocks are allocated such that at the next journal commit, in the default data=ordered mode, the data blocks of the new file are forced to disk before the rename() operation is committed. This provides roughly the same level of guarantees as ext3, and avoids the "zero-length" problem that can happen when a system crashes before the delayed allocation blocks are forced to disk. {quote} from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] was: Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc (*), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force th
[GitHub] [kafka] mjsax commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor
mjsax commented on code in PR #14139: URL: https://github.com/apache/kafka/pull/14139#discussion_r1286401088 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -145,6 +163,14 @@ private void assignStandbyReplicaTasks(final TreeMap clientSt ClientState::assignStandby, standbyTaskAssignor::isAllowedTaskMovement ); + +if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { Review Comment: As above. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -208,19 +234,27 @@ private static boolean shouldMoveATask(final ClientState sourceClientState, } private static void assignStatelessActiveTasks(final TreeMap clientStates, - final Iterable statelessTasks) { + final Iterable statelessTasks, + final Optional rackAwareTaskAssignor) { final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = new ConstrainedPrioritySet( (client, task) -> true, client -> clientStates.get(client).activeTaskLoad() ); statelessActiveTaskClientsByTaskLoad.offerAll(clientStates.keySet()); +final SortedSet sortedTasks = new TreeSet<>(); for (final TaskId task : statelessTasks) { +sortedTasks.add(task); final UUID client = statelessActiveTaskClientsByTaskLoad.poll(task); final ClientState state = clientStates.get(client); state.assignActive(task); statelessActiveTaskClientsByTaskLoad.offer(client); } + +if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() && rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) { Review Comment: As above. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java: ## @@ -56,52 +67,118 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.fail; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@RunWith(Parameterized.
[GitHub] [kafka] philipnee closed pull request #14154: [TEST] Testing build failures for the commit
philipnee closed pull request #14154: [TEST] Testing build failures for the commit URL: https://github.com/apache/kafka/pull/14154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1286370800 ## tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java: ## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.PartitionFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter; +import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GetOffsetShell { +private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?"); + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println("Error occurred: " + e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println("Error occurred: " + e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException { +GetOffsetShell getOffsetShell = new GetOffsetShell(); + +GetOffsetShellOptions options = new GetOffsetShellOptions(args); + +Map partitionOffsets = getOffsetShell.fetchOffsets(options); + +for (Map.Entry entry : partitionOffsets.entrySet()) { +TopicPartition topic = entry.getKey(); + +System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()})); +} +} + +private static class GetOffsetShellOptions extends CommandDefaultOptions { +private final OptionSpec brokerListOpt; +private final OptionSpec bootstrapServerOpt; +private final OptionSpec topicPartitionsOpt; +private final OptionSpec topicOpt; +private final OptionSpec partitionsOpt; +private final OptionSpec timeOpt; +private final OptionSpec commandConfigOpt; +private final OptionSpec effectiveBrokerListOpt; +private final OptionSpecBuilder excludeInternalTopicsOpt; + +public GetOffsetShellOptions(String[] args) throws TerseException { +super(args); + +brokerListOpt = parser.accepts("br
[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1286347860 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,15 +290,52 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } -return primaryStore.set(values, (primaryWriteError, ignored) -> { +List partitionsWithTombstoneOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() == null) +.map(Map.Entry::getKey).collect(Collectors.toList()); + +Map tombstoneOffsets = new HashMap<>(); +for (ByteBuffer partition : partitionsWithTombstoneOffsets) { +tombstoneOffsets.put(partition, null); +} +Map regularOffsets = values.entrySet().stream() +.filter(offsetEntry -> offsetEntry.getValue() != null) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +// If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, +// written to the secondary store in a synchronous manner and then to the primary store. +// This is because, if a tombstone offset is successfully written to the per-connector offsets topic, +// but cannot be written to the global offsets topic, then the global offsets topic will still contain that +// source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, +// if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the +// offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. +CompletableFuture offsetWriteFuture = CompletableFuture.completedFuture(null); +if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { +offsetWriteFuture.thenApply(v -> { +Future secondaryWriteFuture = secondaryStore.set(tombstoneOffsets, new FutureCallback<>()); +try { +if (exactlyOnce) { +secondaryWriteFuture.get(); +} else { +secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); +} +} catch (ExecutionException e) { +log.error("{} Flush of tombstone(s) offsets to secondary store threw an unexpected exception: ", this, e.getCause()); +} catch (Exception e) { +log.error("{} Got Exception when trying to flush tombstone(s) offsets to secondary store", this, e); +} Review Comment: Why are we catching these exceptions at all? Shouldn't we be throwing them so that the offset commit fails? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #14096: KAFKA-14595 AdminUtils rewritten in java
nizhikov commented on code in PR #14096: URL: https://github.com/apache/kafka/pull/14096#discussion_r1286331468 ## server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.admin; + +import org.apache.kafka.common.errors.InvalidPartitionsException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.server.common.AdminOperationException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +public class AdminUtils { +static final Random RAND = new Random(); + +public static final String ADMIN_CLIENT_ID = "__admin_client"; + +public static Map> assignReplicasToBrokers(Collection brokerMetadatas, + int nPartitions, + int replicationFactor) { +return assignReplicasToBrokers(brokerMetadatas, nPartitions, replicationFactor, -1, -1); +} + +/** + * There are 3 goals of replica assignment: + * + * + * Spread the replicas evenly among brokers. + * For partitions assigned to a particular broker, their other replicas are spread over the other brokers. + * If all brokers have rack information, assign the replicas for each partition to different racks if possible + * + * + * To achieve this goal for replica assignment without considering racks, we: + * + * Assign the first replica of each partition by round-robin, starting from a random position in the broker list. + * Assign the remaining replicas of each partition with an increasing shift. + * + * + * Here is an example of assigning + * + * broker-0broker-1broker-2broker-3broker-4 + * p0 p1 p2 p3 p4 (1st replica) + * p5 p6 p7 p8 p9 (1st replica) + * p4 p0 p1 p2 p3 (2nd replica) + * p8 p9 p5 p6 p7 (2nd replica) + * p3 p4 p0 p1 p2 (3nd replica) + * p7 p8 p9 p5 p6 (3nd replica) + * + * + * + * To create rack aware assignment, this API will first create a rack alternated broker list. For example, + * from this brokerID -> rack mapping: + * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1" + * + * + * The rack alternated list will be: + * + * 0, 3, 1, 5, 4, 2 + * + * + * Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment + * will be: + * + * 0 -> 0,3,1 + * 1 -> 3,1,5 + * 2 -> 1,5,4 + * 3 -> 5,4,2 + * 4 -> 4,2,0 + * 5 -> 2,0,3 + * + * + * Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start + * shifting the followers. This is to ensure we will not always get the same set of sequences. + * In this case, if there is another partition to assign (partition #6), the assignment will be: + * + * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0) + * + * + * The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated + * broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have + * any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on + * the broker list. + * + * + * + * As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that + * each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect + * s
[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
C0urante commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1286292051 ## tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java: ## @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + +private static final String MANIFEST_PREFIX = "META-INF/services/"; +private static final Object[] LIST_TABLE_COLUMNS = { +"pluginName", +"firstAlias", +"secondAlias", +"pluginVersion", +"pluginType", +"isLoadable", +"hasManifest", +"pluginLocation" // last because it is least important and most repetitive +}; + +public static void main(String[] args) { +Exit.exit(mainNoExit(args, System.out, System.err)); +} + +public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { +ArgumentParser parser = parser(); +try { +Namespace namespace = parser.parseArgs(args); +Config config = parseConfig(parser, namespace, out); +runCommand(config); +return 0; +} catch (ArgumentParserException e) { +parser.handleError(e); +return 1; +} catch (TerseException e) { +err.println(e.getMessage()); +return 2; +} catch (Throwable e) { +err.println(e.getMessage()); +err.println(Utils.stackTrace(e)); +return 3; +} +} + +private static ArgumentParser parser() { +ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") +.defaultHelp(true) +.description("Manage plugins on the Connect plugin.path"); + +ArgumentParser listCommand = parser.addSubparsers() +.description("List information about plugins contained within the specified plugin locations") +.dest("subcommand") +.addParser("list"); + +ArgumentParser[] subparsers = new ArgumentParser[] { +listCommand, +}; + +for (ArgumentParser subparser : subparsers) { +ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin providers"); +pluginProvid
[GitHub] [kafka] gharris1727 commented on pull request #12806: KAFKA-14345: Fix flakiness with more accurate bound in (Dynamic)ConnectionQuotaTest
gharris1727 commented on PR #12806: URL: https://github.com/apache/kafka/pull/12806#issuecomment-1668440375 Hey @divijvaidya . > aren't we then accepting the current behaviour of quotas implementation as expected behaviour i.e. the deviation could be +- epsilon and epsilon could exceed the thresholds (max.connection.creation.rate) set for a windowing period. It is not possible to eliminate the deviation visible to the outside observer, and these tests will always need to include an error term. Here's what the error bounds will be for the default window configuration and various observation times with the variable-width and fixed-width windowing algorithms: ``` millis variable-width fixed-width 110012.4446 2.2 200011.736842105263158 2.2 220011.736842105263158 1.5714285714285714 300011.5172413793103448 1.5714285714285714 330011.5172413793103448 1.375 400011.4102564102564104 1.375 440011.4102564102564104 1.2790697674418605 500011.346938775510204 1.2790697674418605 550011.346938775510204 1.2223 600011.305084745762712 1.2223 660011.305084745762712 1.1846153846153846 700011.2753623188405796 1.1846153846153846 770011.2753623188405796 1.1578947368421053 800011.2531645569620253 1.1578947368421053 880011.2531645569620253 1.1379310344827587 900011.2359550561797752 1.1379310344827587 990011.2359550561797752 1.1224489795918366 ``` This PR implements the variable-width limits because the variable-width algorithm is currently on trunk, not because it is more accepted or correct. I'm personally ambivalent about which algorithm should be used. This PR replaces the un-motivated and incorrect hardcoded constants with computed bounds, and that is beneficial with or without the fixed-width algorithm. > Alternatively, the fix i.e. https://github.com/apache/kafka/pull/12045 doesn't require a KIP (it doesn't change any public interfaces). Please feel free to pick up that PR (or duplicate it). I won't have time any time soon to work on it. I will be happy to provide a review. If you don't have time or interest to work that PR and/or KIP, then I think a test-only fix is appropriate until someone is interested in picking up the PR. The flaky tests have already made us aware of the odd behavior of the variable-width algorithm, so more ongoing failures aren't helpful. > Having said that, the answer might be that the current behaviour is accepted behaviour. In which case, I would be comfortable with this change if it is accompanied by a change in docs explaining the current expectations from the windowing algorithm so that the users at least know what their expectation from the quota implementation should be. I don't think that what this PR addresses needs to be explained in the documentation. * The existing quotas documentation is very general, and focuses on the strategic usage of quotas. This extremely specific error bound would be out of place among the other documentation. * How users choose their rate limit depends on so many more important factors than this error term (such as network environment, hardware, etc) that I think to mention the error factor in specific would mislead users to think it is more important than it is. * Users will already be compensating for this error term when setting their limits. If someone observes that the effective rate limit (either for bursts or steady-state flows) is too high and affects their cluster health, they will lower their configured rate limit to compensate. They may be blaming the wrong thing (their hardware vs the rate limit algorithm) but the effect is still the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
C0urante commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1286274051 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -206,6 +207,12 @@ public static Set pluginLocations(String pluginPath) { for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); +if (!pluginPath.isEmpty()) { Review Comment: ```suggestion if (pluginPath.isEmpty()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)
C0urante commented on code in PR #14068: URL: https://github.com/apache/kafka/pull/14068#discussion_r1286204067 ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in connectors. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. service_load is the fastest strategy, but will hide incompatible plugins and may make some unusable. Care should be taken to verify that plugins are compatible before setting this configuration to service_load. + +Prior to version 3.6, this strategy was not configurable, and behaved like the only_scan mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to hybrid_warn which is also compatible with all plugins, but logs a warning for plugins which are incompatible with service_load. The hybrid_fail strategy stops the worker with an error if a plugin incompatible with service_load is detected, asserting that all plugins are compatible. Finally, the service_load strategy will hide incompatible plugins in the REST API, and may make some unusable entirely. Review Comment: I don't love the level of detail in "hide incompatible plugins in the REST API, and may make some unusable entirely", since this isn't really behavior that we've implemented intentionally. I'd rather be more general here and just say "may make some plugins unusable"; IMO the term "unusable" gives us enough wiggle room that it covers both "completely" unusable (i.e., unloadable) and "partially" unusable (i.e., hidden but loadable). ## docs/connect.html: ## @@ -543,6 +543,67 @@ ACL requirements +Plugin Discovery + +Plugin discovery is the name for the strategy which the Connect worker uses to find plugin classes and make them accessible to configure and run in Connectors and Tasks. This is controlled by the plugin.discovery worker configuration, and has a significant impact on worker startup time. SERVICE_LOAD is the fastest strategy, but care should be taken to verify that plugins are compatible before setting this configuration to SERVICE_LOAD. + +Prior to version 3.6, this strategy was not configurable, and behaved like the ONLY_SCAN mode which is compatible with all plugins. For version 3.6 and later, this mode defaults to HYBRID_WARN which is also compatible with all plugins, but logs a warning for all plugins which are incompatible with the other modes. For unit-test environments that use the EmbeddedConnectCluster this defaults to the HYBRID_FAIL strategy, which stops the worker with an error if an incompatible plugin is detected. Finally, the SERVICE_LOAD strategy will silently hide incompatible plugins and make them unusable. Review Comment: > I think that including "incompatible with service_load" is less correct but less confusing. My take is that in general, it's okay to provide information that is not comprehensive but, within a limited scope, accurate. The idea is to only cover scenarios that are highly relevant to users. For example: yes, it's technically correct that plugins without manifests are incompatible with `hybrid_warn`, but since the ultimate goal of this feature is to progress towards `service_load`, it's okay if we only provide information with that context in mind. ## docs/connect.html: ## @@ -577,7 +638,11 @@ Developing a Simple Co Connector Example -We'll cover the SourceConnector as a simple example. SinkConnector implementations are very similar. Start by creating the class that inherits from SourceConnector and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size): +We'll cover the SourceConnector as a simple example. SinkConnector implementations are very similar. Pick a package and class name, these examples will use the FileStreamSourceConnector but substitute your own class name where appropriate. In order to make the plugin discoverable at runtime, add a ServiceLoader manifest to your resources in META-INF/services/org.apache.kafka.connect.source.SourceConnector with your fully-qualified class name on a single line: + +com.example.FileStreamSourceConnector + +Create a class that inherits from SourceConnector and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size): Review Comment: Worth adding the `package com.example` directive to the `public class FileStreamSourceConnector ...` code snippet, since the package name serves a f
[GitHub] [kafka] mjsax merged pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax merged PR #14150: URL: https://github.com/apache/kafka/pull/14150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1286245745 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, Review Comment: ```suggestion null, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, Review Comment: ```suggestion Importance.MEDIUM, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, Type.LIST, Collections.emptyList(), atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, Review Comment: ```suggestion Importance.MEDIUM, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, Type.LIST, Collections.emptyList(), atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, Review Comment: ```suggestion null, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, Review Comment: ```suggestion Type.INT, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
[GitHub] [kafka] rondagostino commented on a diff in pull request #14160: KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration
rondagostino commented on code in PR #14160: URL: https://github.com/apache/kafka/pull/14160#discussion_r1286234750 ## docs/ops.html: ## @@ -3778,6 +3778,14 @@ Migrating brokers to KRaft Each broker is restarted with a KRaft configuration until the entire cluster is running in KRaft mode. + Reverting to ZooKeeper mode During the Migration +While the cluster is still in migration mode, it is possible to revert to ZK mode. In order to do this: + + One by one, take each KRaft broker down, and restart the broker as KRaft. Review Comment: s/restart the broker as KRaft/restart the broker as ZooKeeper/ ## docs/ops.html: ## @@ -3615,7 +3615,7 @@ The following features are not yet supported for ZK to KRaft migrations: -Downgrading to ZooKeeper mode during or after the migration +Reverting to ZooKeeper mode after the migration Other features not yet supported in KRaft Review Comment: I'm not sure we should say "Reverting to ZooKeeper mode after the migration" since the implication is that this isn't supported **yet**. In fact it will never be supported. So maybe this should just point to the list of unsupported features only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14155: MINOR: update Kafka Streams state.dir doc
mjsax commented on PR #14155: URL: https://github.com/apache/kafka/pull/14155#issuecomment-1668327508 Cherry-picked to `3.5, [...], 2.8` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.
[ https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751761#comment-17751761 ] Matthias J. Sax commented on KAFKA-15302: - Thanks for providing more details; the `delete()` call that interleaved with calls to the all-iterator should be the root cause; the delete call was missing in the original description. So we now know where the flush/evict is coming from. Thanks for confirming! Still not sure how to fix it though right now. [~guozhang] any ideas? > Stale value returned when using store.all() in punctuation function. > > > Key: KAFKA-15302 > URL: https://issues.apache.org/jira/browse/KAFKA-15302 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Jinyong Choi >Priority: Major > > When using the store.all() function within the Punctuation function of > this.context.schedule, the previous value is returned. In other words, even > though the value has been stored from 1 to 2, it doesn't return 2; instead, > it returns 1. > In the provided test code, you can see the output 'BROKEN !!!', and while > this doesn't occur 100% of the time, by adding logs, it's evident that during > the while loop after all() is called, the cache is flushed. As a result, the > named cache holds a null value, causing the return of a value from RocksDB. > This is observed as the value after the .get() call is different from the > expected value. This is possibly due to the consistent read functionality of > RocksDB, although the exact cause is not certain. > Of course, if you perform {{store.flush()}} before {{all()}} there won't be > any errors. > > * test code (forked from balajirrao and modified for this) > [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main] > > {code:java} > private void forwardAll(final long timestamp) { > // > System.err.println("forwardAll Start"); KeyValueIterator Integer> kvList = this.kvStore.all(); > while (kvList.hasNext()) { > KeyValue entry = kvList.next(); > final Record msg = new Record<>(entry.key, > entry.value, context.currentSystemTimeMs()); > final Integer storeValue = this.kvStore.get(entry.key); if > (entry.value != storeValue) { > System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: > " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + > " but KeyValueIterator value: " + entry.value); > throw new RuntimeException("Broken!"); > } this.context.forward(msg); > } > kvList.close(); > } > {code} > * log file (add log in stream source) > > {code:java} > # console log > sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1" > [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20) > ... > [info] running Coordinator 1 > appid: 95108c48-7c69-4eeb-adbd-9d091bd84933 > [0] starting instance +1 > forwardAll Start > [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but > KeyValueIterator value: 1 > # log file > ... > 01:05:00.382 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on > flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401 > 01:05:00.388 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.NamedCache -- Named Cache flush > dirtyKeys.size():7873 entries:7873 > 01:05:00.434 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.p.i.ProcessorStateManager -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > stream-task [0_0] Flushed cache or buffer Counts > ... > 01:05:00.587 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.s.i.CachingKeyValueStore -- KeyValueIterator > all() > 01:05:00.588 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.RocksDBStore -- RocksDB KeyValueIterator all > 01:05:00.590 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.ThreadCache -- stream-thread > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > MemoryLRUCacheBytesIterator cache all() > 01:05:00.591 > [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1] > INFO o.a.k.s.state.internals.Nam
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759 ] Matthias J. Sax commented on KAFKA-15259: - {quote}Should we create a new ticket for Kafka Producer? {quote} As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 for the producer already. If it's not covering what you think we need, just leave a comment on the ticket. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread > [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] > stream-task [0_0] Error encountered sending record to topic output-topic for > task 0_0 due to: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > Exception handler choose to CONTINUE processing in spite of this error but > written offsets would not be recorded. > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 INFO TransactionManager:393 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Transiting to abortable error state > due to org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized w
[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses
[ https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751756#comment-17751756 ] Matthias J. Sax commented on KAFKA-14747: - We can discuss here on the ticket I guess. The class in question is [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java#L114] We already added a `TRACE` log for this case recently. For using the dropped record sensor, cf. [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java#L66-L70] that does the same thing. Let me know if this helps to get you bootstrapped. > FK join should record discarded subscription responses > -- > > Key: KAFKA-14747 > URL: https://issues.apache.org/jira/browse/KAFKA-14747 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Koma Zhang >Priority: Minor > Labels: beginner, newbie > > FK-joins are subject to a race condition: If the left-hand side record is > updated, a subscription is sent to the right-hand side (including a hash > value of the left-hand side record), and the right-hand side might send back > join responses (also including the original hash). The left-hand side only > processed the responses if the returned hash matches to current hash of the > left-hand side record, because a different hash implies that the lef- hand > side record was updated in the mean time (including sending a new > subscription to the right hand side), and thus the data is stale and the > response should not be processed (joining the response to the new record > could lead to incorrect results). > A similar thing can happen on a right-hand side update that triggers a > response, that might be dropped if the left-hand side record was updated in > parallel. > While the behavior is correct, we don't record if this happens. We should > consider to record this using the existing "dropped record" sensor or maybe > add a new sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] CalvinConfluent commented on pull request #14138: Fix a race when query isUnderMinIsr
CalvinConfluent commented on PR #14138: URL: https://github.com/apache/kafka/pull/14138#issuecomment-1668274095 @jolshan Can you help take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
[ https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-15312: --- Description: Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc (*), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force that any delayed allocation blocks are allocated such that at the next journal commit, in the default data=ordered mode, the data blocks of the new file are forced to disk before the rename() operation is committed. This provides roughly the same level of guarantees as ext3, and avoids the "zero-length" problem that can happen when a system crashes before the delayed allocation blocks are forced to disk. {quote} from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] was: Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc(*), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force that
[jira] [Created] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move
José Armando García Sancio created KAFKA-15312: -- Summary: FileRawSnapshotWriter must flush before atomic move Key: KAFKA-15312 URL: https://issues.apache.org/jira/browse/KAFKA-15312 Project: Kafka Issue Type: Bug Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio Fix For: 3.6.0 Not all file system fsync to disk on close. For KRaft to guarantee that the data has made it to disk before calling rename it needs to make sure that the file has been fsync. We have seen cases were the snapshot file has zero-length data on ext4 file system. {quote} "Delayed allocation" means that the filesystem tries to delay the allocation of physical disk blocks for written data for as long as possible. This policy brings some important performance benefits. Many files are short-lived; delayed allocation can keep the system from writing fleeting temporary files to disk at all. And, for longer-lived files, delayed allocation allows the kernel to accumulate more data and to allocate the blocks for data contiguously, speeding up both the write and any subsequent reads of that data. It's an important optimization which is found in most contemporary filesystems. But, if blocks have not been allocated for a file, there is no need to write them quickly as a security measure. Since the blocks do not yet exist, it is not possible to read somebody else's data from them. So ext4 will not (cannot) write out unallocated blocks as part of the next journal commit cycle. Those blocks will, instead, wait until the kernel decides to flush them out; at that point, physical blocks will be allocated on disk and the data will be made persistent. The kernel doesn't like to let file data sit unwritten for too long, but it can still take a minute or so (with the default settings) for that data to be flushed - far longer than the five seconds normally seen with ext3. And that is why a crash can cause the loss of quite a bit more data when ext4 is being used. {quote} from: [https://lwn.net/Articles/322823/] {quote}auto_da_alloc(*), noauto_da_alloc Many broken applications don't use fsync() when replacing existing files via patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", "foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If auto_da_alloc is enabled, ext4 will detect the replace-via-rename and replace-via-truncate patterns and force that any delayed allocation blocks are allocated such that at the next journal commit, in the default data=ordered mode, the data blocks of the new file are forced to disk before the rename() operation is committed. This provides roughly the same level of guarantees as ext3, and avoids the "zero-length" problem that can happen when a system crashes before the delayed allocation blocks are forced to disk. {quote} from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov opened a new pull request, #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property
clolov opened a new pull request, #14161: URL: https://github.com/apache/kafka/pull/14161 # TODO I need to find where the unit/integration tests for this behaviour are in order to add the manual test to them ### Summary The purpose of this change is to not allow a broker to start up with Tiered Storage disabled (`remote.log.storage.system.enable=false`) while there are still topics which have `remote.storage.enable` set. ### Testing ``` [2023-08-07 18:01:30,654] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) org.apache.kafka.common.config.ConfigException: You have to delete all topics with the property remote.storage.enable (i.e. mrfreeze) before disabling tiered storage cluster-wide at kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:71) at kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:175) at kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:174) at scala.collection.immutable.Map$Map4.foreach(Map.scala:569) at kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:174) at kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:165) at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115) at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:165) at kafka.server.KafkaServer.startup(KafkaServer.scala:571) at kafka.Kafka$.main(Kafka.scala:113) at kafka.Kafka.main(Kafka.scala) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
[ https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751747#comment-17751747 ] Chris Egerton commented on KAFKA-15310: --- Hey [~romsouza], thanks for filing this ticket! Since the proposed changes affect public interface (i.e., they add a configuration property to part of the project), a KIP is required. You can read up on the KIP process [here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]. Thanks again for your interest in contributing to Kafka! > Add timezone configuration option in TimestampConverter from connectors > --- > > Key: KAFKA-15310 > URL: https://issues.apache.org/jira/browse/KAFKA-15310 > Project: Kafka > Issue Type: New Feature > Components: config, connect >Reporter: Romulo Souza >Priority: Minor > Labels: needs-kip > Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de > tela de 2023-08-05 09-44-25-1.png > > > In some cenarios where the use of TimestampConverter happens, it's > interesting to have an option to determine a specific timezone other than UTC > (hardcoded). E.g., there are use cases where a sink connector sends data to a > database and this same data is used in analysis tool without formatting and > transformation options. > It should be added a new Kafka Connector's optional configuration to set the > desired timezone with a fallback to UTC when not informed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors
[ https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15310: -- Labels: needs-kip (was: ) > Add timezone configuration option in TimestampConverter from connectors > --- > > Key: KAFKA-15310 > URL: https://issues.apache.org/jira/browse/KAFKA-15310 > Project: Kafka > Issue Type: New Feature > Components: config, connect >Reporter: Romulo Souza >Priority: Minor > Labels: needs-kip > Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de > tela de 2023-08-05 09-44-25-1.png > > > In some cenarios where the use of TimestampConverter happens, it's > interesting to have an option to determine a specific timezone other than UTC > (hardcoded). E.g., there are use cases where a sink connector sends data to a > database and this same data is used in analysis tool without formatting and > transformation options. > It should be added a new Kafka Connector's optional configuration to set the > desired timezone with a fallback to UTC when not informed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced
gharris1727 commented on PR #14156: URL: https://github.com/apache/kafka/pull/14156#issuecomment-1668233376 @monish-byte This specific PR thread isn't the right place to discuss this, so please in the future post your general questions on the dev mailing list. Your next steps should be: > ask [on the mailing list] for access to the JIRA and Confluence. From there, you can ask more questions [on the mailing list] about contributing, submit your own issues, and work on issues that have already been reported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14158: KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled
C0urante commented on PR #14158: URL: https://github.com/apache/kafka/pull/14158#issuecomment-1668229509 I've been scratching my head over this one for a bit. One one hand, it's nice to allow heavily-filtered source connectors to record progress (and this was a suggestion I made to address part of the motivation for [KIP-910](https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records)) so that there are fewer duplicates if one is restarted. However, the current behavior when exactly-once support is disabled also has some benefits. Right now it's possible to write an SMT that does batching of many source records into a single Kafka record. I'm also curious--what's the behavior with sink connectors when records are filtered via SMT? Does this vary depending on whether the connector's task class overrides the `SinkTask::preCommit` method? @vamossagar12 Ultimately I agree that some work probably has to be done around this logic, and thanks for identifying the discrepancy. I'm just not certain that the decision I made to commit offsets for dropped records when working on exactly-once source connectors was the correct one, and think we should at least consider reverting that change in behavior rather than updating other, longer-existing modes to align with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14598) Fix flaky ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751735#comment-17751735 ] Greg Harris edited comment on KAFKA-14598 at 8/7/23 4:33 PM: - [~ashwinpankaj] The PR for this is a one line fix, and you already proved that it was fixing a flakey failure. I don't think this should be closed as the problem has not been addressed. If you no longer wish to work on this, leave it open and unassigned. was (Author: gharris1727): [~ashwinpankaj] The PR for this is a one line fix, and you already proved that it was fixing a flakey failure. I don't think this should be closed as the problem has not been addressed. > Fix flaky ConnectRestApiTest > > > Key: KAFKA-14598 > URL: https://issues.apache.org/jira/browse/KAFKA-14598 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ashwin Pankaj >Assignee: Ashwin Pankaj >Priority: Minor > Labels: flaky-test > > ConnectRestApiTest sometimes fails with the message > {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not > Found\n\nHTTP ERROR 404 Not > Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not > > Found\nSERVLET:-\n\n\n\n\n', > 'http://172.31.1.75:8083/connector-plugins/')}} > This happens because ConnectDistributedService.start() by default waits till > the the line > {{Joined group at generation ..}} is visible in the logs. > In most cases this is sufficient. But in the cases where the test fails, we > see that this message appears even before Connect RestServer has finished > initialization. > {quote} - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Joined group at generation 2 with protocol version 1 > and got assignment: Assignment{error=0, > leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', > leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 > +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" > "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer) > - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is > started and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14598) Fix flaky ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751735#comment-17751735 ] Greg Harris commented on KAFKA-14598: - [~ashwinpankaj] The PR for this is a one line fix, and you already proved that it was fixing a flakey failure. I don't think this should be closed as the problem has not been addressed. > Fix flaky ConnectRestApiTest > > > Key: KAFKA-14598 > URL: https://issues.apache.org/jira/browse/KAFKA-14598 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ashwin Pankaj >Assignee: Ashwin Pankaj >Priority: Minor > Labels: flaky-test > > ConnectRestApiTest sometimes fails with the message > {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not > Found\n\nHTTP ERROR 404 Not > Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not > > Found\nSERVLET:-\n\n\n\n\n', > 'http://172.31.1.75:8083/connector-plugins/')}} > This happens because ConnectDistributedService.start() by default waits till > the the line > {{Joined group at generation ..}} is visible in the logs. > In most cases this is sufficient. But in the cases where the test fails, we > see that this message appears even before Connect RestServer has finished > initialization. > {quote} - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, > groupId=connect-cluster] Joined group at generation 2 with protocol version 1 > and got assignment: Assignment{error=0, > leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', > leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 > +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" > "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer) > - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is > started and ready to handle requests > (org.apache.kafka.connect.runtime.rest.RestServer) > {quote} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest
bachmanity1 commented on PR #14153: URL: https://github.com/apache/kafka/pull/14153#issuecomment-1668221501 Hi @divijvaidya Thanks for letting me know about ticket KAFKA-14132. It turns out that @mdedetrich has already begun working on this issue and has submitted a PR, but he said he doesn't have enough time to complete it and agreed to close it. Thus, I think we can handle this issue in this PR. When you have time could you please review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration
[ https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15311: - Summary: Fix docs about reverting to ZooKeeper mode during KRaft migration (was: Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible) > Fix docs about reverting to ZooKeeper mode during KRaft migration > - > > Key: KAFKA-15311 > URL: https://issues.apache.org/jira/browse/KAFKA-15311 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Minor > > The cocs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible
[ https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15311: - Description: The cocs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible > Docs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible > -- > > Key: KAFKA-15311 > URL: https://issues.apache.org/jira/browse/KAFKA-15311 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Minor > > The cocs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration
[ https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15311: Assignee: Colin McCabe > Fix docs about reverting to ZooKeeper mode during KRaft migration > - > > Key: KAFKA-15311 > URL: https://issues.apache.org/jira/browse/KAFKA-15311 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > > The cocs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe opened a new pull request, #14160: KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration
cmccabe opened a new pull request, #14160: URL: https://github.com/apache/kafka/pull/14160 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during the migration is not possible
Colin McCabe created KAFKA-15311: Summary: Docs incorrectly state that reverting to ZooKeeper mode during the migration is not possible Key: KAFKA-15311 URL: https://issues.apache.org/jira/browse/KAFKA-15311 Project: Kafka Issue Type: Bug Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible
[ https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15311: - Summary: Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible (was: Docs incorrectly state that reverting to ZooKeeper mode during the migration is not possible) > Docs incorrectly state that reverting to ZooKeeper mode during KRaft > migration is not possible > -- > > Key: KAFKA-15311 > URL: https://issues.apache.org/jira/browse/KAFKA-15311 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned
gharris1727 commented on code in PR #14159: URL: https://github.com/apache/kafka/pull/14159#discussion_r1286093427 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java: ## @@ -16,24 +16,19 @@ */ package org.apache.kafka.connect.transforms; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; public class TimestampRouterTest { -private final TimestampRouter xform = new TimestampRouter<>(); Review Comment: This should probably be paired with a BeforeEach instead of duplicating it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned
gharris1727 commented on code in PR #14159: URL: https://github.com/apache/kafka/pull/14159#discussion_r1286093427 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java: ## @@ -16,24 +16,19 @@ */ package org.apache.kafka.connect.transforms; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import java.util.Collections; import static org.junit.jupiter.api.Assertions.assertEquals; public class TimestampRouterTest { -private final TimestampRouter xform = new TimestampRouter<>(); Review Comment: This should probably be paired with a @BeforeEach instead of duplicating it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)
lucasbru commented on code in PR #14001: URL: https://github.com/apache/kafka/pull/14001#discussion_r1286114522 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ## @@ -86,12 +87,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); -} else { -// if a task is no longer processable, ask task-manager to give it another -// task in the next iteration -if (currentTask.isProcessable(nowMs)) { +} + +if (currentTask != null) { Review Comment: Hmm, that's a good point! Not sure if I'm worried about getting back null (we still need the check even with a condition variable), but it'd be good to avoid busy waiting. But we still want to shut-down cleanly, so we still want to check isRunning and isPaused as well...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)
lucasbru commented on code in PR #14001: URL: https://github.com/apache/kafka/pull/14001#discussion_r1286114522 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ## @@ -86,12 +87,29 @@ private void runOnce(final long nowMs) { if (currentTask == null) { currentTask = taskManager.assignNextTask(DefaultTaskExecutor.this); -} else { -// if a task is no longer processable, ask task-manager to give it another -// task in the next iteration -if (currentTask.isProcessable(nowMs)) { +} + +if (currentTask != null) { Review Comment: Hmm, that's a good point! Not sure if I'm worried about getting back null (we still need the check even with a condition variable), but it'd be good to avoid busy waiting. But we still want to shut-down cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #14158: KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled
vamossagar12 commented on PR #14158: URL: https://github.com/apache/kafka/pull/14158#issuecomment-1668174595 Yash, Greg I have marked you as reviewers for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14421) OffsetFetchRequest throws NPE Exception
[ https://issues.apache.org/jira/browse/KAFKA-14421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751726#comment-17751726 ] Philip Nee commented on KAFKA-14421: [~showuon] - should we close this issue as it is not supported. > OffsetFetchRequest throws NPE Exception > --- > > Key: KAFKA-14421 > URL: https://issues.apache.org/jira/browse/KAFKA-14421 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: yws >Assignee: yws >Priority: Major > Attachments: image-2022-11-27-22-28-52-165.png, > image-2022-11-27-22-41-45-358.png > > > when I use 0.10.2 client send Metadata request to 0.10.0 server, NPE > exception happens, > !image-2022-11-27-22-28-52-165.png! > the NPE exception quite confused me, because if just send Metadata request > doest not cause the NPE exception occurs, after troubleshooting the problem, > It is the NetworkClient#poll call ConsumerNetworkClient#trySend and further > call NetworkClient#doSendwhen trying to build OffsetFetchRequest, because > the 0.10.0 server doest not support fetch all TopicPartitions, it throw > UnsupportedVersionException, > {code:java} > private void doSend(ClientRequest clientRequest, boolean isInternalRequest, > long now) { > String nodeId = clientRequest.destination(); > .. > AbstractRequest request = null; > AbstractRequest.Builder builder = clientRequest.requestBuilder(); > try { > NodeApiVersions versionInfo = nodeApiVersions.get(nodeId); > // Note: if versionInfo is null, we have no server version > information. This would be > // the case when sending the initial ApiVersionRequest which > fetches the version > // information itself. It is also the case when > discoverBrokerVersions is set to false. > if (versionInfo == null) { > if (discoverBrokerVersions && log.isTraceEnabled()) > log.trace("No version information found when sending > message of type {} to node {}. " + > "Assuming version {}.", clientRequest.apiKey(), > nodeId, builder.version()); > } else { > short version = > versionInfo.usableVersion(clientRequest.apiKey()); > builder.setVersion(version); > } > // The call to build may also throw UnsupportedVersionException, > if there are essential > // fields that cannot be represented in the chosen version. > request = builder.build(); > } catch (UnsupportedVersionException e) { > // If the version is not supported, skip sending the request over > the wire. > // Instead, simply add it to the local queue of aborted requests. > log.debug("Version mismatch when attempting to send {} to {}", > clientRequest.toString(), clientRequest.destination(), e); > ClientResponse clientResponse = new > ClientResponse(clientRequest.makeHeader(), > clientRequest.callback(), clientRequest.destination(), > now, now, > false, e, null); > abortedSends.add(clientResponse); > return; > } > {code} > !image-2022-11-27-22-41-45-358.png! > until now, all are expected, but unfortunately, in catch > UnsupportedVersionException code block, clientRequest.toString need to call > requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when > partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the > unexpected NPE, and make the normal MetadataRequest failed.. > {code:java} > catch (UnsupportedVersionException e) { > > log.debug("Version mismatch when attempting to send {} to {}", > clientRequest.toString(), clientRequest.destination(), e); > ClientResponse clientResponse = new > ClientResponse(clientRequest.makeHeader(), > clientRequest.callback(), clientRequest.destination(), > now, now, > false, e, null); > abortedSends.add(clientResponse); > return; > } > ClientRequest#toString() >public String toString() { > return "ClientRequest(expectResponse=" + expectResponse + > ", callback=" + callback + > ", destination=" + destination + > ", correlationId=" + correlationId + > ", clientId=" + clientId + > ", createdTimeMs=" + createdTimeMs + > ", requestBuilder=" + requestBuilder + > ")"; > } > OffsetFetchRequest's Builder#toString > public String toString() { > StringBuilder bld = n
[GitHub] [kafka] kamalcph commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
kamalcph commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286092708 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java: ## @@ -104,6 +106,22 @@ public class RemoteLogMetadataCache { // https://issues.apache.org/jira/browse/KAFKA-12641 protected final ConcurrentMap leaderEpochEntries = new ConcurrentHashMap<>(); +private final CountDownLatch initializedLatch = new CountDownLatch(1); + +public void markInitialized() { +initializedLatch.countDown(); +} + +public void ensureInitialized() throws InterruptedException { +if (!initializedLatch.await(2, TimeUnit.MINUTES)) { Review Comment: We can reduce the timeout to 10 seconds but should think of a solution for Admin#listOffsets call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286091923 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); +
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286090883 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + +private final int numMetadataTopicPartitions = 5; +private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); +private final DummyEventHandler handler = new DummyEventHandler(); +private final Set remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() +.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); +private final Uuid topicId = Uuid.randomUuid(); +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + +private ConsumerTask consumerTask; +private MockConsumer consumer; +private Thread thread; + +@BeforeEach +public void beforeEach() { +final Map offsets = remoteLogPartitions.stream() +.collect(Collectors.toMap(Function.identity(), e -> 0L)); +consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); +consumer.updateBeginningOffsets(offsets); +ConsumerTask.pollIntervalMs = 10L; +consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); +thread = new Thread(consumerTask); +} + +@AfterEach +public void afterEach() throws InterruptedException { +if (thread != null) { +consumerTask.close(); +thread.join(); +} +} + +@Test +public void testCloseOnNoAssignment() throws InterruptedException { +thread.start(); +Thread.sleep(10); Revi
[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions
abhijeetk88 commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1286089506 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -64,302 +65,387 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); -private static final long POLL_INTERVAL_MS = 100L; +static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); -private final KafkaConsumer consumer; -private final String metadataTopicName; +private final Consumer consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; -private final Time time; +private final Time time = new SystemTime(); +// TODO - Update comments below // It indicates whether the closing process has been started or not. If it is set as true, // consumer will stop consuming messages, and it will not allow partition assignments to be updated. -private volatile boolean closing = false; - +private volatile boolean isClosed = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is // determined that the consumer needs to be assigned with the updated partitions. -private volatile boolean assignPartitions = false; +private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. -private volatile Set assignedMetaPartitions = Collections.emptySet(); +private volatile Set assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. -private Set assignedTopicPartitions = Collections.emptySet(); +private volatile Map assignedUserTopicIdPartitions = Collections.emptyMap(); +private volatile Set processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); -// Map of remote log metadata topic partition to consumed offsets. Received consumer records -// may or may not have been processed based on the assigned topic partitions. -private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); +private long uninitializedAt = time.milliseconds(); +private boolean isAllUserTopicPartitionsInitialized; -// Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. -private Map lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); +// Map of remote log metadata topic partition to consumed offsets. +private final Map readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); +private final Map readOffsetsByUserTopicPartition = new HashMap<>(); -private final long committedOffsetSyncIntervalMs; -private CommittedOffsetsFile committedOffsetsFile; -private long lastSyncedTimeMs; +private Map offsetHolderByMetadataPartition = new HashMap<>(); +private boolean isOffsetsFetchFailed = false; +private long lastFailedFetchOffsetsTimestamp; -public ConsumerTask(KafkaConsumer consumer, -String metadataTopicName, -RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, +public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, -Path committedOffsetsPath, -Time time, -long committedOffsetSyncIntervalMs) { -this.consumer = Objects.requireNonNull(consumer); -this.metadataTopicName = Objects.requireNonNull(metadataTopicName); +Function, Consumer> consumerSupplier) { +this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); -this.time = Objects.requireNonNull(time); -this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - -initializeConsumerAssignment(committedOffsetsPath); -} - -private void initializeConsumerAssignment(Path committedOffsetsPath) { -try { -committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); -} catch (IOException e) { -throw new KafkaException(e); -} - -Map committedOffsets = Collections.emptyMap(); -try { -// Load committed offset a
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074 ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back since this change is unrelated to this PR: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back since the change is unrelated to this PR: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on PR #14116: URL: https://github.com/apache/kafka/pull/14116#issuecomment-1668144105 > This leads to a situation where the tests we add will not be runnable with kraft/zk mode easily. We are not using ZooKeeper client directly anywhere so Kraft mode should work as expected. > My main concern is that we are using IntegrationTestHarness but duplicating a lot of it's code in our own implementation in TieredStorageContext Agree, there are some code duplications but it gives us flexibility to reuse the client which avoids log pollution while debugging a failed test. > unnecessary specs and actions Plan is to add respective tests such as ReassignReplicaShrinkTest, ReassignReplicaExpandTest, ReassignReplicaMoveTest, PartitionsExpandTest, DeleteSegmentsByRetentionTimeTest, ListOffsetsTest and so on once the build becomes stable. We can remove the unused actions if required. Thanks for reviewing the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >= 1"
[GitHub] [kafka] vamossagar12 commented on pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments
vamossagar12 commented on PR #14000: URL: https://github.com/apache/kafka/pull/14000#issuecomment-1668141669 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286074651 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >= 1"
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >= 1"