[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-07 Thread via GitHub


fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1286654127


##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+private static final Pattern TOPIC_PARTITION_PATTERN = 
Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?");
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println("Error occurred: " + e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println("Error occurred: " + e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws IOException, 
ExecutionException, InterruptedException, TerseException {
+GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+Map partitionOffsets = 
getOffsetShell.fetchOffsets(options);
+
+for (Map.Entry entry : 
partitionOffsets.entrySet()) {
+TopicPartition topic = entry.getKey();
+
+System.out.println(String.join(":", new String[]{topic.topic(), 
String.valueOf(topic.partition()), entry.getValue().toString()}));
+}
+}
+
+private static class GetOffsetShellOptions extends CommandDefaultOptions {
+private final OptionSpec brokerListOpt;
+private final OptionSpec bootstrapServerOpt;
+private final OptionSpec topicPartitionsOpt;
+private final OptionSpec topicOpt;
+private final OptionSpec partitionsOpt;
+private final OptionSpec timeOpt;
+private final OptionSpec commandConfigOpt;
+private final OptionSpec effectiveBrokerListOpt;
+private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+public GetOffsetShellOptions(String[] args) throws TerseException {
+super(args);
+
+brokerListOpt = parser.accepts("broker-list

[GitHub] [kafka] abhijeetk88 commented on pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on PR #14127:
URL: https://github.com/apache/kafka/pull/14127#issuecomment-1668965018

   Thanks for the review @showuon. I have addressed your comments and responded 
to your questions. Please take a look.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286634284


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633888


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286633719


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632651


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286632214


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);

Revi

[GitHub] [kafka] alok123t commented on a diff in pull request #14162: KAFKA-15312; Force channel before atomic file move

2023-08-07 Thread via GitHub


alok123t commented on code in PR #14162:
URL: https://github.com/apache/kafka/pull/14162#discussion_r1286557850


##
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java:
##
@@ -112,6 +112,9 @@ public void freeze() {
 checkIfFrozen("Freeze");
 
 frozenSize = channel.size();
+// force the channel to write to the file system before closing, 
this guarantees that the file has the data
+// on disk before preforming the atomic file move

Review Comment:
   typo: performing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751878#comment-17751878
 ] 

Guozhang Wang commented on KAFKA-15259:
---

Got it, KAFKA-15309 makes sense.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using exactly_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with exactly_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of exactly_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-Stre

[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-07 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15259:
---
Description: 
[Problem]
 - Kafka Streams does not continue processing due to rollback despite 
ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
 -- "CONTINUE will signal that Streams should ignore the issue and continue 
processing"(1), so Kafka Streams should continue processing even if using 
exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
 -- However, if using exactly_once, Kafka Streams does not continue processing 
due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the 
client will be shut down as the default 
behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 

[Environment]
 - Kafka Streams 3.5.1

[Reproduction procedure]
 # Create "input-topic" topic and "output-topic"
 # Put several messages on "input-topic"
 # Execute a simple Kafka streams program that transfers too large messages 
from "input-topic" to "output-topic" with exactly_once and returns 
ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
producer. Please refer to the reproducer program (attached file: 
Reproducer.java).
 # ==> However, Kafka Streams does not continue processing due to rollback 
despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
(2). Please refer to the debug log (attached file: app_exactly_once.log).
 ## My excepted behavior is that Kafka Streams should continue processing even 
if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE used.

[As far as my investigation]
 - FYI, if using at_least_once instead of exactly_once, Kafka Streams continue 
processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is 
used. Please refer to the debug log (attached file: app_at_least_once.log).
- "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
Streams 3.2.0, as rollback occurs.

(1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
 - 
[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]

(2) Transaction abort and shutdown occur
{code:java}
2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Exception occurred during message send:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
stream-task [0_0] Error encountered sending record to topic output-topic for 
task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
Exception handler choose to CONTINUE processing in spite of this error but 
written offsets would not be recorded.
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
1188 bytes when serialized which is larger than 1048576, which is the value 
of the max.request.size configuration.
2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION 
to error state ABORTABLE_ERROR
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
Processed 1 records with 1 iterations; invoking punctuators if necessary
2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 
punctuators ran.
2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
Committing all active tasks [0_0] an

[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-07 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15259:
---
Summary: Kafka Streams does not continue processing due to rollback despite 
ProductionExceptionHandlerResponse.CONTINUE if using exactly_once  (was: Kafka 
Streams does not continue processing due to rollback despite 
ProductionExceptionHandlerResponse.CONTINUE if using execute_once)

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using exactly_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger t

[jira] [Updated] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

2023-08-07 Thread Tomonari Yamashita (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomonari Yamashita updated KAFKA-15259:
---
Description: 
[Problem]
 - Kafka Streams does not continue processing due to rollback despite 
ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
 -- "CONTINUE will signal that Streams should ignore the issue and continue 
processing"(1), so Kafka Streams should continue processing even if using 
exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
 -- However, if using execute_once, Kafka Streams does not continue processing 
due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the 
client will be shut down as the default 
behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 

[Environment]
 - Kafka Streams 3.5.1

[Reproduction procedure]
 # Create "input-topic" topic and "output-topic"
 # Put several messages on "input-topic"
 # Execute a simple Kafka streams program that transfers too large messages 
from "input-topic" to "output-topic" with exactly_once and returns 
ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
producer. Please refer to the reproducer program (attached file: 
Reproducer.java).
 # ==> However, Kafka Streams does not continue processing due to rollback 
despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
(2). Please refer to the debug log (attached file: app_exactly_once.log).
 ## My excepted behavior is that Kafka Streams should continue processing even 
if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE used.

[As far as my investigation]
 - FYI, if using at_least_once instead of execute_once, Kafka Streams continue 
processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is 
used. Please refer to the debug log (attached file: app_at_least_once.log).
- "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
Streams 3.2.0, as rollback occurs.

(1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
 - 
[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]

(2) Transaction abort and shutdown occur
{code:java}
2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Exception occurred during message send:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
stream-task [0_0] Error encountered sending record to topic output-topic for 
task 0_0 due to:
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
Exception handler choose to CONTINUE processing in spite of this error but 
written offsets would not be recorded.
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
1188 bytes when serialized which is larger than 1048576, which is the value 
of the max.request.size configuration.
2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer 
clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
 transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION 
to error state ABORTABLE_ERROR
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1188 
bytes when serialized which is larger than 1048576, which is the value of the 
max.request.size configuration.
2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
Processed 1 records with 1 iterations; invoking punctuators if necessary
2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 
punctuators ran.
2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread 
[java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
Committing all active tasks [0_0] an

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Tomonari Yamashita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751862#comment-17751862
 ] 

Tomonari Yamashita commented on KAFKA-15259:


Thank you for creating the new ticket for Kafka Producer: 
https://issues.apache.org/jira/browse/KAFKA-15309. I think that similar 
problems occur not only with Kafka Streams but also with Kafka Producer, so I 
agree that it needs to be resolved first in the Kafka Producer ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 

[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751836#comment-17751836
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

{quote}it's a combo of a cache snapshot and the underlying RocksDB's
{quote}
I think this is actually the bug: we actually don't return a snapshot over the 
cache. If it would be a snapshot, flushing the cache would not modify it, but 
it does as you pointed out:
{quote}hat modifying the store for `keyA` might change the content of `KeyB` 
compared with the snapshot
{quote}
I am totally in favor of "decouple flushing with forwarding" thought, 
independent of this ticket. But that's a larger piece of work anyway, and I am 
also not sure if we would want to make such a change in-place, or via DSL 2.0?
{quote}letting any range queries to flush cache first, and then only return 
from the underlying store.
{quote}
That's an interesting idea. – A naive fix would be, to actually make a "shallow 
copy (with copy on write)" of the named cached when opening an iterator, to 
guard the shallow copy for evection. But this is also hard to get right and 
could be very memory intensive...

In case we cannot provide a fix easily, updating the docs is for sure something 
we should do.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751835#comment-17751835
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Ah. Sorry. Meant https://issues.apache.org/jira/browse/KAFKA-15309 (sorry for 
the c&p error – fixed in the above two comments).
{quote}KS could tell KP to not transit to errors for any `abortable` error type 
but just ignore and continue?
{quote}
That's not so easy. Currently, the producer goes into ERROR state, so we need 
to actually close the producer and would need to create a new one, including 
falling back to last committed offset and retry the failed transactions. If the 
error is "deterministic" (like a `RercordTooLargeException`) we would just hit 
it again on re-try, and would need to do more book keeping to actually avoid 
hitting the error again. That's why it seems simpler to actually skip the error 
inside the producer (-> https://issues.apache.org/jira/browse/KAFKA-15309) to 
avoid that the producer goes into ERROR state to begin with.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:59 PM:
--

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15309 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.


was (Author: mjsax):
{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If 
it's not covering what you think we need, just leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.requ

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309 for the producer already. If 
it's not covering what you think we need, just leave a comment on the ticket.


was (Author: mjsax):
{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.requ

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751267#comment-17751267
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.


was (Author: mjsax):
I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred dur

[jira] [Comment Edited] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751267#comment-17751267
 ] 

Matthias J. Sax edited comment on KAFKA-15259 at 8/7/23 11:58 PM:
--

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
-https://issues.apache.org/jira/browse/KAFKA-15259- 
https://issues.apache.org/jira/browse/KAFKA-15309.

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.


was (Author: mjsax):
I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15259 

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred dur

[jira] [Commented] (KAFKA-15197) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()

2023-08-07 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751833#comment-17751833
 ] 

Greg Harris commented on KAFKA-15197:
-

[~divijvaidya] I apologize for the flakiness of the test, It's a lot worse than 
I expected.

I've opened a patch that should fix the bug and stabilize the test that we can 
hopefully get merged this week or next.

> Flaky test 
> MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
> --
>
> Key: KAFKA-15197
> URL: https://issues.apache.org/jira/browse/KAFKA-15197
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.6.0
>
>
> As of Jul 17th, this is the second most flaky test in our CI on trunk and 
> fails 46% of times. 
> See: 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe/Berlin]
>  
> Note that MirrorConnectorsIntegrationExactlyOnceTest has multiple tests but 
> testOffsetTranslationBehindReplicationFlow is the one that is the reason for 
> most failures. see: 
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe/Berlin&tests.container=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest]
>  
>  
> Reason for failure is: 
> |org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets for consumer group consumer-group-lagging-behind not translated from 
> primary for topic primary.test-topic-1 ==> expected:  but was: |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286472665


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.

Review Comment:
   I've rewritten this to make the previous paragraph more terse, and to 
explain a bit more in this paragraph about why the service_load strategy is 
faster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751830#comment-17751830
 ] 

Guozhang Wang commented on KAFKA-15302:
---

I think case I'm not sure if this is defined as a bug that should be fixed in 
KS: the semantics of `store.all()` is to return a snapshot (behind the scene, 
it's a combo of a cache snapshot and the underlying RocksDB's snapshot) at the 
time of the call, and if the store gets modified after the `all()` call, we do 
not guarantee the entries from the `all()` iterator would return consistent 
results as the `get()` which is meant to return the latest value.

What's confusing though is that the users may not think that modifying the 
store for `keyA` might change the content of `KeyB` compared with the snapshot 
--- as what's happened here due to evictions --- but I think unless we change 
how we execute the caching layer's put/delete calls and how they can trigger 
eviction (of other keys), this would always be the case. For now the only thing 
we can do probably is to clarify it in the docs.

In the long run, if we feel such a confusion is really un-intuitive, we could 
consider 1) decouple flushing with forwarding, and then 2) letting any range 
queries to flush cache first, and then only return from the underlying store.

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [951

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286461778


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.
+
+Once you see that all incompatible plugins are included in the listing, 
you can proceed to dry-run the migration with sync-manifests 
--dry-run. This will perform all parts of the migration, except for 
writing the results of the migration to disk. Note that the 
sync-manifests command requires all specified paths to be 
writable, and may alter the contents of the directories. Make a backup of the 
specified paths, or copy them to a writable directory.
+
+Ensure that you have a backup and the dry-run succeeds before removing 
the --dry-run flag and actually running the migration. If the 
migration fails without the --dry-run flag, then the partially 
migrated artifacts should be discarded. The migration is idempotent, so running 
it multiple times and on already-migrated plugins is safe. After the migration 
is completed, you should v

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286460752


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.
+
+Once you see that all incompatible plugins are included in the listing, 
you can proceed to dry-run the migration with sync-manifests 
--dry-run. This will perform all parts of the migration, except for 
writing the results of the migration to disk. Note that the 
sync-manifests command requires all specified paths to be 
writable, and may alter the contents of the directories. Make a backup of the 
specified paths, or copy them to a writable directory.

Review Comment:
   I added a "take note" mention to the verification section, and a "Be sure to 
compare" mention here as a call back. It also meshes well with the special case 
for the classpath plugins mentioned immediately before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specifi

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751824#comment-17751824
 ] 

Guozhang Wang commented on KAFKA-15259:
---

[~mjsax] Did you mean a different ticket other than KAFKA-15259 (which is this 
ticket)? BTW I think after the change you proposed, we could still improve the 
KS' layer of handling abortable and fatal txn errors from underlying producers 
on top of what's summarized in KIP-691: if people agree that when KS's 
exception is configured as CONTINUE, then `RecordTooLargeException` etc (maybe 
all exceptions causing `abortable` rather than `fatal` errors as in KIP-691), 
then KS could tell KP to not transit to errors for any `abortable` error type 
but just ignore and continue?

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  Trans

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286456172


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.

Review Comment:
   > Hmmm... I agree with the reservations about modifying JAR files in-flight. 
Do we have to actually write to those JAR files, though? Could we create/modify 
a service loader manifest on the file system instead of inside a JAR file?
   > I know this is a little inelegant but IMO it's worth considering since it 
would reduce the potential for footguns related to classpath plugins and would 
increase the general utility of the CLI tool.
   
   If we did this, we'd end up with the same problem one iteration later. 
Suppose we were able to find the kafka lib directory, and knew that we were 
running via `kafka-run-class.sh`, so the manifest shim jar would be picked up 
the next time the script is run. Because it's on the classpath, we wouldn't be 
able to mutate it safely, so we could only append manifests by adding new shim 
jar

[GitHub] [kafka] guozhangwang commented on a diff in pull request #14149: HOTFIX: avoid placement of unnecessary transient standby tasks

2023-08-07 Thread via GitHub


guozhangwang commented on code in PR #14149:
URL: https://github.com/apache/kafka/pull/14149#discussion_r1286454991


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -618,18 +618,24 @@ private boolean assignTasksToClients(final Cluster 
fullMetadata,
 final boolean lagComputationSuccessful =
 populateClientStatesMap(clientStates, clientMetadataMap, 
taskForPartition, changelogTopics);
 
-log.info("{} members participating in this rebalance: \n{}.",
-clientStates.size(),
-clientStates.entrySet().stream()
-.sorted(comparingByKey())
-.map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
-.collect(Collectors.joining(Utils.NL)));
+
+log.info("{} client nodes and {} consumers participating in this 
rebalance: \n{}.",
+ clientStates.size(),
+ 
clientStates.values().stream().map(ClientState::capacity).reduce(Integer::sum),
+ clientStates.entrySet().stream()
+ .sorted(comparingByKey())
+ .map(entry -> entry.getKey() + ": " + 
entry.getValue().consumers())
+ .collect(Collectors.joining(Utils.NL)));
 
 final Set allTasks = partitionsForTask.keySet();
 statefulTasks.addAll(changelogTopics.statefulTaskIds());
 
-log.debug("Assigning tasks {} including stateful {} to clients {} with 
number of replicas {}",
-allTasks, statefulTasks, clientStates, numStandbyReplicas());
+log.info("Assigning stateful tasks: {}\n"
+ + "and stateless tasks: {}",
+ statefulTasks,
+ allTasks.removeAll(statefulTasks));

Review Comment:
   Won't this change the content of `allTasks`? Also `removeAll` returns a 
boolean.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -1063,9 +1071,13 @@ private Map> 
buildStandbyTaskMap(final String consum
 }
 
 for (final TaskId task : revokedTasks) {
-if (allStatefulTasks.contains(task)) {

Review Comment:
   Not sure I fully understand the change here either. Would appreciate some 
more explanations --- specifically, could you give a simple example in the PR 
description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286456172


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.
+
+Verifying Plugin 
Compatibility
+
+To verify if all of your plugins are compatible, first ensure that you 
are using version 3.6 or later of the Connect runtime. You can then perform one 
of the following checks:
+
+
+Start your worker with the default 
HYBRID_WARNstrategy, and WARN logs enabled for the 
org.apache.kafka.connect package. At least one WARN log message 
mentioning the plugin.discovery configuration should be printed. 
This log message will explicitly say that all plugins are compatible, or list 
the incompatible plugins.
+Start your worker in a test environment with 
HYBRID_FAIL. If all plugins are compatible, startup will succeed. 
If at least one plugin is not compatible the worker will fail to start up, and 
all incompatible plugins will be listed in the exception.
+
+
+If the verification step succeeds, then your current set of installed 
plugins are compatible, and it should be safe to change the 
plugin.discovery configuration to SERVICE_LOAD. If 
you change the set of already-installed plugins, they may no longer be 
compatible, and you should repeat the above verification. If the verification 
fails, you must address the incompatible plugins before using the 
SERVICE_LOAD strategy.
+
+Operators: Artifact 
Migration
+
+As an operator of Connect, if you discover incompatible plugins, there 
are multiple ways to try to resolve the incompatibility. They are listed below 
from most to least preferable.
+
+
+Upgrade your incompatible plugins to the latest release version 
from your plugin provider.
+Contact your plugin provider and request that they migrate the 
plugin to be compatible, following the source migration 
instructions, and then upgrade to the migrated version.
+Migrate the plugin artifacts yourself using the included migration 
script.
+
+
+The migration script is located in 
bin/connect-plugin-path.sh and 
bin\windows\connect-plugin-path.bat of your Kafka installation. 
The script can migrate incompatible plugin artifacts already installed on your 
Connect worker's plugin.path by adding or modifying JAR or 
resource files. This is not suitable for environments using code-signing, as 
this may change the artifacts such that they will fail signature verification. 
View the built-in help with --help.
+
+To perform a migration, first use the list subcommand to 
get an overview of the plugins available to the script. You must tell the 
script where to find plugins, which can be done with the repeatable 
--worker-config, --plugin-path, and 
--plugin-location arguments. The script will only migrate plugins 
present in the paths specified, so if you add plugins to your worker's 
classpath, then you will need to specify those plugins via one or more 
--plugin-location arguments.

Review Comment:
   > Hmmm... I agree with the reservations about modifying JAR files in-flight. 
Do we have to actually write to those JAR files, though? Could we create/modify 
a service loader manifest on the file system instead of inside a JAR file?
   > I know this is a little inelegant but IMO it's worth considering since it 
would reduce the potential for footguns related to classpath plugins and would 
increase the general utility of the CLI tool.
   
   If we did this, we'd end up with the same problem one iteration later. 
Suppose we were able to find the kafka lib directory, and knew that we were 
running via `kafka-run-class.sh`, so the manifest shim jar would be picked up 
the next time the script is run. Because it's on the classpath, we wouldn't be 
able to mutate it safely, so we could only append manifests by adding new shim 
jar

[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751821#comment-17751821
 ] 

Guozhang Wang commented on KAFKA-15297:
---

I think this is indeed a general issue, that state stores are initialized in 
the order of the topology which is essentially the "processor node order", as 
in ``InternalTopologyBuilder#build``. This works when a state store is only 
associated with one processors, or when a store is associated with multiple 
processors but they are built as part of a built-in operator (like a join in 
DSL) in which case we carefully make sure that state stores order is adherent 
with the processors order; but in a PAPI scenario like Bruno reported in this 
one, all bets are off.

I think a general fix would be, in the above ``build`` function, we only build 
the processors in the first loop pass without initializing the state stores, 
and then based on the built processors order the state stores to be 
initialized, and do that in another pass.

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.
> You can observe the flush order by feeding some records into the input topics 
> of the topology, waiting for a commit,  and looking for the following log 
> message:
> https://github.com/apache/kaf

[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-07 Thread via GitHub


lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286439107


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java:
##
@@ -56,52 +67,118 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(Parameterized.class)
 public class HighAvailabilityTaskAssignorTest {
-private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
-/*acceptableRecoveryLag*/ 100L,
-/*maxWarmupReplicas*/ 2,
-/*numStandbyReplicas*/ 0,
-/*probingRebalanceIntervalMs*/ 60 * 1000L,
-/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-);
-
-private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
-/*acceptableRecoveryLag*/ 100L,
-/*maxWarmupReplicas*/ 2,
-/*numStandbyReplicas*/ 1,
-/*probingRebalanceIntervalMs*/ 60 * 1000L,
-/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-);
+private AssignmentConfigs getConfigWithoutStandbys() {
+return new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L,
+/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+null,
+null,
+rackAwareStrategy
+);
+}
+
+private AssignmentConfigs getConfigWithStandbys() {
+return getConfigWithStandbys(1);
+}
+
+private AssignmentConfigs getConfigWithStandbys(final int replicaNum) {
+return new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ replicaNum,
+/*probingRebalanceIntervalMs*/ 60 * 1000L,
+/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+null,
+null,
+rackAwareStrategy
+);
+}
+
+@Parameter
+public boolean enableRackAwareTaskAssignor;
+
+private String rackAwareStrategy = 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
+
+@Before
+public void setUp() {
+if (enableRackAwareTaskAssignor) {
+

[GitHub] [kafka] lihaosky commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-07 Thread via GitHub


lihaosky commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286438784


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -124,11 +133,20 @@ private static void assignActiveStatefulTasks(final 
SortedMap
 ClientState::assignActive,
 (source, destination) -> true
 );
+
+if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   I think currently it's because there are tests not modified and still passes 
null. I can remove this in my next PR after I modified all `HAAssignor` tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15281) Implement the groupMetadata Consumer API

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15281:
-

Assignee: (was: Kirk True)

> Implement the groupMetadata Consumer API
> 
>
> Key: KAFKA-15281
> URL: https://issues.apache.org/jira/browse/KAFKA-15281
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The threading refactor project needs to implement the {{groupMetadata()}} API 
> call once support for the KIP-848 protocol is implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15282:
-

Assignee: (was: Kirk True)

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15280) Implement client support for KIP-848 server-side assignors

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15280:
-

Assignee: (was: Kirk True)

> Implement client support for KIP-848 server-side assignors
> --
>
> Key: KAFKA-15280
> URL: https://issues.apache.org/jira/browse/KAFKA-15280
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> This includes:
>  * Validate the client’s configuration for assignor selection
>  * Validate the request/response from the {{ConsumerGroupHeartbeat}} RPC call
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15284:
-

Assignee: (was: Kirk True)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15279) Implement client support for KIP-848 assignment RPCs

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15279:
-

Assignee: (was: Kirk True)

> Implement client support for KIP-848 assignment RPCs
> 
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15278:
-

Assignee: (was: Kirk True)

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15306:
--
Labels: consumer-threading-refactor  (was: client consumer)

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15304:
--
Labels: consumer-threading-refactor  (was: )

> CompletableApplicationEvents aren't being completed when the consumer is 
> closing
> 
>
> Key: KAFKA-15304
> URL: https://issues.apache.org/jira/browse/KAFKA-15304
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> If the background thread is closed before ingesting all ApplicationEvents, we 
> should drain the background queue and try to cancel these events before 
> closing. We can try to process these events before closing down the consumer; 
> however, we assume that when the user issues a close command, the consumer 
> should be shut down promptly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15306:
--
Labels: client consumer  (was: client consumer kip-945)

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Task
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client, consumer
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15305:
--
Component/s: clients

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15305:
--
Labels: consumer-threading-refactor  (was: )

> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15304:
--
Component/s: clients

> CompletableApplicationEvents aren't being completed when the consumer is 
> closing
> 
>
> Key: KAFKA-15304
> URL: https://issues.apache.org/jira/browse/KAFKA-15304
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>
> If the background thread is closed before ingesting all ApplicationEvents, we 
> should drain the background queue and try to cancel these events before 
> closing. We can try to process these events before closing down the consumer; 
> however, we assume that when the user issues a close command, the consumer 
> should be shut down promptly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-08-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15306:
--
Component/s: clients
 consumer

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client, consumer
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #14163: MINOR: Improve JavaDocs of KafkaStreams `context.commit()`

2023-08-07 Thread via GitHub


mjsax commented on code in PR #14163:
URL: https://github.com/apache/kafka/pull/14163#discussion_r1286429053


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java:
##
@@ -805,12 +805,14 @@ static Map> 
getTopologyGroupTaskMap() {
 return Collections.singletonMap(SUBTOPOLOGY_0, 
Collections.singleton(new TaskId(1, 1)));
 }
 
-static void verifyStandbySatisfyRackReplica(final Set taskIds,
+static void verifyStandbySatisfyRackReplica(
+final Set taskIds,

Review Comment:
   Unrelated side cleanup to fix formatting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-07 Thread via GitHub


kirktrue commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1286379164


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -77,17 +77,33 @@ public class CommonClientConfigs {
 public static final String DEFAULT_CLIENT_RACK = "";
 
 public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
-public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. This avoids 
repeatedly connecting to a host in a tight loop. This backoff applies to all 
connection attempts by the client to a broker.";
+public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of 
time to wait before attempting to reconnect to a given host. " +
+"This avoids repeatedly connecting to a host in a tight loop. This 
backoff applies to all connection attempts by the client to a broker. " +
+"This value is the initial backoff value and will increase 
exponentially for each consecutive connection failure, up to the 
reconnect.backoff.max.ms value.";
 
 public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = 
"reconnect.backoff.max.ms";
-public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when reconnecting to a broker that has 
repeatedly failed to connect. If provided, the backoff per host will increase 
exponentially for each consecutive connection failure, up to this maximum. 
After calculating the backoff increase, 20% random jitter is added to avoid 
connection storms.";
+public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum 
amount of time in milliseconds to wait when reconnecting to a broker that has 
repeatedly failed to connect. " +
+"If provided, the backoff per host will increase exponentially for 
each consecutive connection failure, up to this maximum. After calculating the 
backoff increase, 20% random jitter is added to avoid connection storms.";
 
 public static final String RETRIES_CONFIG = "retries";
 public static final String RETRIES_DOC = "Setting a value greater than 
zero will cause the client to resend any request that fails with a potentially 
transient error." +
 " It is recommended to set the value to either zero or `MAX_VALUE` and 
use corresponding timeout parameters to control how long a client should retry 
a request.";
 
 public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
-public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. 
This avoids repeatedly sending requests in a tight loop under some failure 
scenarios.";
+public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed request to a given topic partition. " +
+"This avoids repeatedly sending requests in a tight loop under some 
failure scenarios. This value is the initial backoff value and will increase 
exponentially for each failed request, " +
+"up to the retry.backoff.max.ms value.";
+public static final Long DEFAULT_RETRY_BACKOFF_MS = 100L;
+
+public static final String RETRY_BACKOFF_MAX_MS_CONFIG = 
"retry.backoff.max.ms";
+public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount 
of time in milliseconds to wait when retrying a request to the broker that has 
repeatedly failed. " +
+"If provided, the backoff per client will increase exponentially for 
each failed request, up to this maximum. To prevent all clients from being 
synchronized upon retry, " +
+"a randomized jitter with a factor of 0.2 will be applied to the 
backoff, resulting in the backoff falling within a range between 20% below and 
20% above the computed value. " +
+"If retry.backoff.ms is set to be higher than 
retry.backoff.max.ms, then retry.backoff.max.ms will 
be used as a constant backoff from the beginning without any exponential 
increase";

Review Comment:
   Interesting...



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java:
##
@@ -372,10 +372,16 @@ private static ConfigDef config(Crypto crypto) {
 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)
 .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
 ConfigDef.Type.LONG,
-100L,
+CommonClientConfigs.DEFAULT_RETRY_BACKOFF_MS,
 atLeast(0L),
 ConfigDef.Importance.LOW,
 CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+.define(CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG,

Review Comment:
   Does `DistributedConfig` need to call that 
`CommonClientConfigs.warnDisablingExponentialBackoff()` method li

[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-07 Thread via GitHub


gharris1727 commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1668643267

   I've also added a new warning to stdout which alerts users when they have 
incompatible plugins on the classpath that will not be auto-migrated. Currently 
without any manifests in the runtime (they're in a different PR) this is what 
it looks like:
   ```
   46 plugins on the classpath are not compatible, and will not be 
auto-migrated. Please move these plugins to a plugin path location.
   org.apache.kafka.connect.transforms.ReplaceField$Value
   org.apache.kafka.connect.converters.FloatConverter
   
   org.apache.kafka.connect.converters.ShortConverter
   org.apache.kafka.connect.transforms.predicates.TopicNameMatches
   Total plugins:  1
   Loadable plugins:   1
   Compatible plugins: 1
   Total locations:1
   Compatible locations:   1
   All locations compatible?   true
   ```
   
   They aren't part of the main table, and they aren't part of the summary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-07 Thread via GitHub


gharris1727 commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1668640505

   > I think our workaround with a special path of classpath to denote 
classpath plugins in PluginUtils::classpathPluginSource should be improved. I 
built the basic auth extension, created a directory named classpath, moved the 
JAR file for the extension into that directory, and then ran 
bin/connect-plugin-path.sh list --plugin-location classpath. The end result was 
fairly confusing:
   
   Thanks for finding that, I've changed it so that PluginSource uses `null` as 
a sentinel value for the classpath, and changed all of the places it's printed 
to re-inject the `"classpath"` string after all the control flow has used 
`null`.
   
   I've replaced the makeUnique hack with Lists and counting instead of trying 
to make the ManifestEntry objects completely unique.
   
   > I've also finally gotten around to looking at the tests. It seems like we 
have a great foundation of scenarios to build on, but the assertions we're 
making are fairly minor. Can we add some coverage to verify, e.g., how many 
plugin rows are listed, the (partial or complete) contents of the post-table 
summary, and which plugins are described as being migrated vs. non-migrated?
   
   Yes the assertions are pretty sparse. I've been avoiding parsing the output 
while it was still in flux, but I think the format is starting to settle down 
now. I'll go through and see what I can add.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1286420767


##
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
+import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConnectPluginPath {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Object[] LIST_TABLE_COLUMNS = {
+"pluginName",
+"firstAlias",
+"secondAlias",
+"pluginVersion",
+"pluginType",
+"isLoadable",
+"hasManifest",
+"pluginLocation" // last because it is least important and most 
repetitive
+};
+
+public static void main(String[] args) {
+Exit.exit(mainNoExit(args, System.out, System.err));
+}
+
+public static int mainNoExit(String[] args, PrintStream out, PrintStream 
err) {
+ArgumentParser parser = parser();
+try {
+Namespace namespace = parser.parseArgs(args);
+Config config = parseConfig(parser, namespace, out);
+runCommand(config);
+return 0;
+} catch (ArgumentParserException e) {
+parser.handleError(e);
+return 1;
+} catch (TerseException e) {
+err.println(e.getMessage());
+return 2;
+} catch (Throwable e) {
+err.println(e.getMessage());
+err.println(Utils.stackTrace(e));
+return 3;
+}
+}
+
+private static ArgumentParser parser() {
+ArgumentParser parser = 
ArgumentParsers.newArgumentParser("connect-plugin-path")
+.defaultHelp(true)
+.description("Manage plugins on the Connect plugin.path");
+
+ArgumentParser listCommand = parser.addSubparsers()
+.description("List information about plugins contained within the 
specified plugin locations")
+.dest("subcommand")
+.addParser("list");
+
+ArgumentParser[] subparsers = new ArgumentParser[] {
+listCommand,
+};
+
+for (ArgumentParser subparser : subparsers) {
+ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin 
providers");
+pluginPro

[GitHub] [kafka] jsancio opened a new pull request, #14162: KAFKA-15312; Force channel before atomic file move

2023-08-07 Thread via GitHub


jsancio opened a new pull request, #14162:
URL: https://github.com/apache/kafka/pull/14162

   On ext4 file systems we have seen snapshots with zero-length files. This is 
possible if the file is closed and moved before forcing the channel to write to 
disk.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Description: 
Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc ( * ), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force that any delayed allocation blocks are 
allocated such that at the next journal commit, in the default data=ordered 
mode, the data blocks of the new file are forced to disk before the rename() 
operation is committed. This provides roughly the same level of guarantees as 
ext3, and avoids the "zero-length" problem that can happen when a system 
crashes before the delayed allocation blocks are forced to disk.
{quote}
from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]

 

  was:
Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc (*), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force th

[GitHub] [kafka] mjsax commented on a diff in pull request #14139: KAFKA-15022: [7/N] use RackAwareTaskAssignor in HAAssignor

2023-08-07 Thread via GitHub


mjsax commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286401088


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -145,6 +163,14 @@ private void assignStandbyReplicaTasks(final TreeMap clientSt
 ClientState::assignStandby,
 standbyTaskAssignor::isAllowedTaskMovement
 );
+
+if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   As above.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -208,19 +234,27 @@ private static boolean shouldMoveATask(final ClientState 
sourceClientState,
 }
 
 private static void assignStatelessActiveTasks(final TreeMap clientStates,
-   final Iterable 
statelessTasks) {
+   final Iterable 
statelessTasks,
+   final 
Optional rackAwareTaskAssignor) {
 final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = 
new ConstrainedPrioritySet(
 (client, task) -> true,
 client -> clientStates.get(client).activeTaskLoad()
 );
 statelessActiveTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
+final SortedSet sortedTasks = new TreeSet<>();
 for (final TaskId task : statelessTasks) {
+sortedTasks.add(task);
 final UUID client = 
statelessActiveTaskClientsByTaskLoad.poll(task);
 final ClientState state = clientStates.get(client);
 state.assignActive(task);
 statelessActiveTaskClientsByTaskLoad.offer(client);
 }
+
+if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   As above.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java:
##
@@ -56,52 +67,118 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(Parameterized.

[GitHub] [kafka] philipnee closed pull request #14154: [TEST] Testing build failures for the commit

2023-08-07 Thread via GitHub


philipnee closed pull request #14154: [TEST] Testing build failures for the 
commit 
URL: https://github.com/apache/kafka/pull/14154


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-08-07 Thread via GitHub


ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1286370800


##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+private static final Pattern TOPIC_PARTITION_PATTERN = 
Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?");
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println("Error occurred: " + e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println("Error occurred: " + e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws IOException, 
ExecutionException, InterruptedException, TerseException {
+GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+GetOffsetShellOptions options = new GetOffsetShellOptions(args);
+
+Map partitionOffsets = 
getOffsetShell.fetchOffsets(options);
+
+for (Map.Entry entry : 
partitionOffsets.entrySet()) {
+TopicPartition topic = entry.getKey();
+
+System.out.println(String.join(":", new String[]{topic.topic(), 
String.valueOf(topic.partition()), entry.getValue().toString()}));
+}
+}
+
+private static class GetOffsetShellOptions extends CommandDefaultOptions {
+private final OptionSpec brokerListOpt;
+private final OptionSpec bootstrapServerOpt;
+private final OptionSpec topicPartitionsOpt;
+private final OptionSpec topicOpt;
+private final OptionSpec partitionsOpt;
+private final OptionSpec timeOpt;
+private final OptionSpec commandConfigOpt;
+private final OptionSpec effectiveBrokerListOpt;
+private final OptionSpecBuilder excludeInternalTopicsOpt;
+
+public GetOffsetShellOptions(String[] args) throws TerseException {
+super(args);
+
+brokerListOpt = parser.accepts("br

[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-08-07 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1286347860


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,52 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenApply(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+}

Review Comment:
   Why are we catching these exceptions at all? Shouldn't we be throwing them 
so that the offset commit fails?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #14096: KAFKA-14595 AdminUtils rewritten in java

2023-08-07 Thread via GitHub


nizhikov commented on code in PR #14096:
URL: https://github.com/apache/kafka/pull/14096#discussion_r1286331468


##
server-common/src/main/java/org/apache/kafka/admin/AdminUtils.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.admin;
+
+import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.server.common.AdminOperationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class AdminUtils {
+static final Random RAND = new Random();
+
+public static final String ADMIN_CLIENT_ID = "__admin_client";
+
+public static Map> 
assignReplicasToBrokers(Collection brokerMetadatas,
+  int 
nPartitions,
+  int 
replicationFactor) {
+return assignReplicasToBrokers(brokerMetadatas, nPartitions, 
replicationFactor, -1, -1);
+}
+
+/**
+ * There are 3 goals of replica assignment:
+ *
+ * 
+ *  Spread the replicas evenly among brokers.
+ *  For partitions assigned to a particular broker, their other 
replicas are spread over the other brokers.
+ *  If all brokers have rack information, assign the replicas for each 
partition to different racks if possible
+ * 
+ *
+ * To achieve this goal for replica assignment without considering racks, 
we:
+ * 
+ *  Assign the first replica of each partition by round-robin, 
starting from a random position in the broker list.
+ *  Assign the remaining replicas of each partition with an increasing 
shift.
+ * 
+ *
+ * Here is an example of assigning
+ * 
+ * 
broker-0broker-1broker-2broker-3broker-4 
+ * p0  p1  p2  p3  
p4  (1st replica)
+ * p5  p6  p7  p8  
p9  (1st replica)
+ * p4  p0  p1  p2  
p3  (2nd replica)
+ * p8  p9  p5  p6  
p7  (2nd replica)
+ * p3  p4  p0  p1  
p2  (3nd replica)
+ * p7  p8  p9  p5  
p6  (3nd replica)
+ * 
+ *
+ * 
+ * To create rack aware assignment, this API will first create a rack 
alternated broker list. For example,
+ * from this brokerID -> rack mapping:
+ * 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 
-> "rack1"
+ * 
+ * 
+ * The rack alternated list will be:
+ * 
+ * 0, 3, 1, 5, 4, 2
+ * 
+ * 
+ * Then an easy round-robin assignment can be applied. Assume 6 partitions 
with replication factor of 3, the assignment
+ * will be:
+ * 
+ * 0 -> 0,3,1 
+ * 1 -> 3,1,5 
+ * 2 -> 1,5,4 
+ * 3 -> 5,4,2 
+ * 4 -> 4,2,0 
+ * 5 -> 2,0,3 
+ * 
+ * 
+ * Once it has completed the first round-robin, if there are more 
partitions to assign, the algorithm will start
+ * shifting the followers. This is to ensure we will not always get the 
same set of sequences.
+ * In this case, if there is another partition to assign (partition #6), 
the assignment will be:
+ * 
+ * 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
+ * 
+ * 
+ * The rack aware assignment always chooses the 1st replica of the 
partition using round robin on the rack alternated
+ * broker list. For rest of the replicas, it will be biased towards 
brokers on racks that do not have
+ * any replica assignment, until every rack has a replica. Then the 
assignment will go back to round-robin on
+ * the broker list.
+ * 
+ * 
+ * 
+ * As the result, if the number of replicas is equal to or greater than 
the number of racks, it will ensure that
+ * each rack will get at least one replica. Otherwise, each rack will get 
at most one replica. In a perfect
+ * s

[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-07 Thread via GitHub


C0urante commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1286292051


##
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
+import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConnectPluginPath {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Object[] LIST_TABLE_COLUMNS = {
+"pluginName",
+"firstAlias",
+"secondAlias",
+"pluginVersion",
+"pluginType",
+"isLoadable",
+"hasManifest",
+"pluginLocation" // last because it is least important and most 
repetitive
+};
+
+public static void main(String[] args) {
+Exit.exit(mainNoExit(args, System.out, System.err));
+}
+
+public static int mainNoExit(String[] args, PrintStream out, PrintStream 
err) {
+ArgumentParser parser = parser();
+try {
+Namespace namespace = parser.parseArgs(args);
+Config config = parseConfig(parser, namespace, out);
+runCommand(config);
+return 0;
+} catch (ArgumentParserException e) {
+parser.handleError(e);
+return 1;
+} catch (TerseException e) {
+err.println(e.getMessage());
+return 2;
+} catch (Throwable e) {
+err.println(e.getMessage());
+err.println(Utils.stackTrace(e));
+return 3;
+}
+}
+
+private static ArgumentParser parser() {
+ArgumentParser parser = 
ArgumentParsers.newArgumentParser("connect-plugin-path")
+.defaultHelp(true)
+.description("Manage plugins on the Connect plugin.path");
+
+ArgumentParser listCommand = parser.addSubparsers()
+.description("List information about plugins contained within the 
specified plugin locations")
+.dest("subcommand")
+.addParser("list");
+
+ArgumentParser[] subparsers = new ArgumentParser[] {
+listCommand,
+};
+
+for (ArgumentParser subparser : subparsers) {
+ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin 
providers");
+pluginProvid

[GitHub] [kafka] gharris1727 commented on pull request #12806: KAFKA-14345: Fix flakiness with more accurate bound in (Dynamic)ConnectionQuotaTest

2023-08-07 Thread via GitHub


gharris1727 commented on PR #12806:
URL: https://github.com/apache/kafka/pull/12806#issuecomment-1668440375

   Hey @divijvaidya .
   
   > aren't we then accepting the current behaviour of quotas implementation as 
expected behaviour i.e. the deviation could be +- epsilon and epsilon could 
exceed the thresholds (max.connection.creation.rate) set for a windowing period.
   
   It is not possible to eliminate the deviation visible to the outside 
observer, and these tests will always need to include an error term. Here's 
what the error bounds will be for the default window configuration and various 
observation times with the variable-width and fixed-width windowing algorithms:
   ```
   millis  variable-width  fixed-width
   110012.4446  2.2
   200011.736842105263158   2.2
   220011.736842105263158   1.5714285714285714
   300011.5172413793103448  1.5714285714285714
   330011.5172413793103448  1.375
   400011.4102564102564104  1.375
   440011.4102564102564104  1.2790697674418605
   500011.346938775510204   1.2790697674418605
   550011.346938775510204   1.2223
   600011.305084745762712   1.2223
   660011.305084745762712   1.1846153846153846
   700011.2753623188405796  1.1846153846153846
   770011.2753623188405796  1.1578947368421053
   800011.2531645569620253  1.1578947368421053
   880011.2531645569620253  1.1379310344827587
   900011.2359550561797752  1.1379310344827587
   990011.2359550561797752  1.1224489795918366
   ```
   
   This PR implements the variable-width limits because the variable-width 
algorithm is currently on trunk, not because it is more accepted or correct. 
I'm personally ambivalent about which algorithm should be used. This PR 
replaces the un-motivated and incorrect hardcoded constants with computed 
bounds, and that is beneficial with or without the fixed-width algorithm.
   
   > Alternatively, the fix i.e. https://github.com/apache/kafka/pull/12045 
doesn't require a KIP (it doesn't change any public interfaces). Please feel 
free to pick up that PR (or duplicate it). I won't have time any time soon to 
work on it. I will be happy to provide a review.
   
   If you don't have time or interest to work that PR and/or KIP, then I think 
a test-only fix is appropriate until someone is interested in picking up the 
PR.  The flaky tests have already made us aware of the odd behavior of the 
variable-width algorithm, so more ongoing failures aren't helpful.
   
   > Having said that, the answer might be that the current behaviour is 
accepted behaviour. In which case, I would be comfortable with this change if 
it is accompanied by a change in docs explaining the current expectations from 
the windowing algorithm so that the users at least know what their expectation 
from the quota implementation should be.
   
   I don't think that what this PR addresses needs to be explained in the 
documentation.
   
   * The existing quotas documentation is very general, and focuses on the 
strategic usage of quotas. This extremely specific error bound would be out of 
place among the other documentation.
   * How users choose their rate limit depends on so many more important 
factors than this error term (such as network environment, hardware, etc) that 
I think to mention the error factor in specific would mislead users to think it 
is more important than it is.
   * Users will already be compensating for this error term when setting their 
limits. If someone observes that the effective rate limit (either for bursts or 
steady-state flows) is too high and affects their cluster health, they will 
lower their configured rate limit to compensate. They may be blaming the wrong 
thing (their hardware vs the rate limit algorithm) but the effect is still the 
same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-08-07 Thread via GitHub


C0urante commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1286274051


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -206,6 +207,12 @@ public static Set pluginLocations(String pluginPath) 
{
 for (String path : pluginPathElements) {
 try {
 Path pluginPathElement = Paths.get(path).toAbsolutePath();
+if (!pluginPath.isEmpty()) {

Review Comment:
   ```suggestion
   if (pluginPath.isEmpty()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14068: KAFKA-15233: Add documentation for plugin.discovery and connect-plugin-path (KIP-898)

2023-08-07 Thread via GitHub


C0urante commented on code in PR #14068:
URL: https://github.com/apache/kafka/pull/14068#discussion_r1286204067


##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
connectors. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
service_load is the fastest strategy, but will hide incompatible 
plugins and may make some unusable. Care should be taken to verify that plugins 
are compatible before setting this configuration to 
service_load.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the only_scan mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to hybrid_warn which is 
also compatible with all plugins, but logs a warning for plugins which are 
incompatible with service_load. The hybrid_fail 
strategy stops the worker with an error if a plugin incompatible with 
service_load is detected, asserting that all plugins are 
compatible. Finally, the service_load strategy will hide 
incompatible plugins in the REST API, and may make some unusable entirely.

Review Comment:
   I don't love the level of detail in "hide incompatible plugins in the REST 
API, and may make some unusable entirely", since this isn't really behavior 
that we've implemented intentionally. I'd rather be more general here and just 
say "may make some plugins unusable"; IMO the term "unusable" gives us enough 
wiggle room that it covers both "completely" unusable (i.e., unloadable) and 
"partially" unusable (i.e., hidden but loadable).



##
docs/connect.html:
##
@@ -543,6 +543,67 @@ ACL requirements
 
 
 
+Plugin 
Discovery
+
+Plugin discovery is the name for the strategy which the Connect worker 
uses to find plugin classes and make them accessible to configure and run in 
Connectors and Tasks. This is controlled by the plugin.discovery worker 
configuration, and has a significant impact on worker startup time. 
SERVICE_LOAD is the fastest strategy, but care should be taken to 
verify that plugins are compatible before setting this configuration to 
SERVICE_LOAD.
+
+Prior to version 3.6, this strategy was not configurable, and behaved 
like the ONLY_SCAN mode which is compatible with all plugins. For 
version 3.6 and later, this mode defaults to HYBRID_WARN which is 
also compatible with all plugins, but logs a warning for all plugins which are 
incompatible with the other modes. For unit-test environments that use the 
EmbeddedConnectCluster this defaults to the 
HYBRID_FAIL strategy, which stops the worker with an error if an 
incompatible plugin is detected. Finally, the SERVICE_LOAD 
strategy will silently hide incompatible plugins and make them unusable.

Review Comment:
   > I think that including "incompatible with service_load" is less correct 
but less confusing.
   
   My take is that in general, it's okay to provide information that is not 
comprehensive but, within a limited scope, accurate. The idea is to only cover 
scenarios that are highly relevant to users. For example: yes, it's technically 
correct that plugins without manifests are incompatible with `hybrid_warn`, but 
since the ultimate goal of this feature is to progress towards `service_load`, 
it's okay if we only provide information with that context in mind.



##
docs/connect.html:
##
@@ -577,7 +638,11 @@ Developing a Simple Co
 
 Connector Example
 
-We'll cover the SourceConnector as a simple example. 
SinkConnector implementations are very similar. Start by creating 
the class that inherits from SourceConnector and add a field that 
will store the configuration information to be propagated to the task(s) (the 
topic to send data to, and optionally - the filename to read from and the 
maximum batch size):
+We'll cover the SourceConnector as a simple example. 
SinkConnector implementations are very similar. Pick a package and 
class name, these examples will use the FileStreamSourceConnector 
but substitute your own class name where appropriate. In order to make the plugin discoverable at runtime, 
add a ServiceLoader manifest to your resources in 
META-INF/services/org.apache.kafka.connect.source.SourceConnector 
with your fully-qualified class name on a single line:
+
+com.example.FileStreamSourceConnector
+
+Create a class that inherits from SourceConnector and add 
a field that will store the configuration information to be propagated to the 
task(s) (the topic to send data to, and optionally - the filename to read from 
and the maximum batch size):

Review Comment:
   Worth adding the `package com.example` directive to the `public class 
FileStreamSourceConnector ...` code snippet, since the package name serves a 
f

[GitHub] [kafka] mjsax merged pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-07 Thread via GitHub


mjsax merged PR #14150:
URL: https://github.com/apache/kafka/pull/14150


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-07 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1286245745


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   ```suggestion
   null,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,

Review Comment:
   ```suggestion
   Importance.MEDIUM,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
 .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
 Type.LIST,
 Collections.emptyList(),
 atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
 Importance.MEDIUM,
 RACK_AWARE_ASSIGNMENT_TAGS_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,

Review Comment:
   ```suggestion
   Importance.MEDIUM,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
 .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
 Type.LIST,
 Collections.emptyList(),
 atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
 Importance.MEDIUM,
 RACK_AWARE_ASSIGNMENT_TAGS_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   ```suggestion
   null,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,

Review Comment:
   ```suggestion
   Type.INT,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG

[GitHub] [kafka] rondagostino commented on a diff in pull request #14160: KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration

2023-08-07 Thread via GitHub


rondagostino commented on code in PR #14160:
URL: https://github.com/apache/kafka/pull/14160#discussion_r1286234750


##
docs/ops.html:
##
@@ -3778,6 +3778,14 @@ Migrating brokers to KRaft
 Each broker is restarted with a KRaft configuration until the entire 
cluster is running in KRaft mode.
   
 
+  Reverting to ZooKeeper mode During the Migration
+While the cluster is still in migration mode, it is possible to revert to 
ZK mode. In order to do this:
+
+  One by one, take each KRaft broker down, and restart the broker as 
KRaft.

Review Comment:
   s/restart the broker as KRaft/restart the broker as ZooKeeper/



##
docs/ops.html:
##
@@ -3615,7 +3615,7 @@ The following features are not yet supported for ZK to KRaft 
migrations:
 
   
-Downgrading to ZooKeeper mode during or after the migration
+Reverting to ZooKeeper mode after the migration
 Other features not yet supported in 
KRaft
   

Review Comment:
   I'm not sure we should say "Reverting to ZooKeeper mode after the migration" 
since the implication is that this isn't supported **yet**.  In fact it will 
never be supported.  So maybe this should just point to the list of unsupported 
features only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14155: MINOR: update Kafka Streams state.dir doc

2023-08-07 Thread via GitHub


mjsax commented on PR #14155:
URL: https://github.com/apache/kafka/pull/14155#issuecomment-1668327508

   Cherry-picked to `3.5, [...], 2.8` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751761#comment-17751761
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

Thanks for providing more details; the `delete()` call that interleaved with 
calls to the all-iterator should be the root cause; the delete call was missing 
in the original description. So we now know where the flush/evict is coming 
from. Thanks for confirming!

Still not sure how to fix it though right now. [~guozhang] any ideas?

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=636397, #flushes=401
> 01:05:00.388 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named Cache flush 
> dirtyKeys.size():7873 entries:7873
> 01:05:00.434 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.p.i.ProcessorStateManager -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  stream-task [0_0] Flushed cache or buffer Counts
> ...
> 01:05:00.587 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.s.i.CachingKeyValueStore --  KeyValueIterator 
> all()
> 01:05:00.588 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.RocksDBStore --  RocksDB KeyValueIterator all
> 01:05:00.590 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.ThreadCache -- stream-thread 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>   MemoryLRUCacheBytesIterator cache all()
> 01:05:00.591 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.Nam

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751759#comment-17751759
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

{quote}Should we create a new ticket for Kafka Producer?
{quote}
As mentioned above, I created https://issues.apache.org/jira/browse/KAFKA-15259 
for the producer already. If it's not covering what you think we need, just 
leave a comment on the ticket.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Transiting to abortable error state 
> due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized w

[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-08-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751756#comment-17751756
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

We can discuss here on the ticket I guess. The class in question is 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java#L114]

We already added a `TRACE` log for this case recently.

For using the dropped record sensor, cf. 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java#L66-L70]
 that does the same thing.

Let me know if this helps to get you bootstrapped.

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] CalvinConfluent commented on pull request #14138: Fix a race when query isUnderMinIsr

2023-08-07 Thread via GitHub


CalvinConfluent commented on PR #14138:
URL: https://github.com/apache/kafka/pull/14138#issuecomment-1668274095

   @jolshan Can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-15312:
---
Description: 
Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc (*), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force that any delayed allocation blocks are 
allocated such that at the next journal commit, in the default data=ordered 
mode, the data blocks of the new file are forced to disk before the rename() 
operation is committed. This provides roughly the same level of guarantees as 
ext3, and avoids the "zero-length" problem that can happen when a system 
crashes before the delayed allocation blocks are forced to disk.
{quote}
from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]

 

  was:
Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc(*), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force that 

[jira] [Created] (KAFKA-15312) FileRawSnapshotWriter must flush before atomic move

2023-08-07 Thread Jira
José Armando García Sancio created KAFKA-15312:
--

 Summary: FileRawSnapshotWriter must flush before atomic move
 Key: KAFKA-15312
 URL: https://issues.apache.org/jira/browse/KAFKA-15312
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio
 Fix For: 3.6.0


Not all file system fsync to disk on close. For KRaft to guarantee that the 
data has made it to disk before calling rename it needs to make sure that the 
file has been fsync.

We have seen cases were the snapshot file has zero-length data on ext4 file 
system.
{quote} "Delayed allocation" means that the filesystem tries to delay the 
allocation of physical disk blocks for written data for as long as possible. 
This policy brings some important performance benefits. Many files are 
short-lived; delayed allocation can keep the system from writing fleeting 
temporary files to disk at all. And, for longer-lived files, delayed allocation 
allows the kernel to accumulate more data and to allocate the blocks for data 
contiguously, speeding up both the write and any subsequent reads of that data. 
It's an important optimization which is found in most contemporary filesystems.

But, if blocks have not been allocated for a file, there is no need to write 
them quickly as a security measure. Since the blocks do not yet exist, it is 
not possible to read somebody else's data from them. So ext4 will not (cannot) 
write out unallocated blocks as part of the next journal commit cycle. Those 
blocks will, instead, wait until the kernel decides to flush them out; at that 
point, physical blocks will be allocated on disk and the data will be made 
persistent. The kernel doesn't like to let file data sit unwritten for too 
long, but it can still take a minute or so (with the default settings) for that 
data to be flushed - far longer than the five seconds normally seen with ext3. 
And that is why a crash can cause the loss of quite a bit more data when ext4 
is being used. 
{quote}
from: [https://lwn.net/Articles/322823/]
{quote}auto_da_alloc(*), noauto_da_alloc

Many broken applications don't use fsync() when replacing existing files via 
patterns such as fd = open("foo.new")/write(fd,..)/close(fd)/ rename("foo.new", 
"foo"), or worse yet, fd = open("foo", O_TRUNC)/write(fd,..)/close(fd). If 
auto_da_alloc is enabled, ext4 will detect the replace-via-rename and 
replace-via-truncate patterns and force that any delayed allocation blocks are 
allocated such that at the next journal commit, in the default data=ordered 
mode, the data blocks of the new file are forced to disk before the rename() 
operation is committed. This provides roughly the same level of guarantees as 
ext3, and avoids the "zero-length" problem that can happen when a system 
crashes before the delayed allocation blocks are forced to disk.
{quote}
from: [https://www.kernel.org/doc/html/latest/admin-guide/ext4.html]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov opened a new pull request, #14161: KAFKA-15267: Do not allow Tiered Storage to be disabled while topics have remote.storage.enable property

2023-08-07 Thread via GitHub


clolov opened a new pull request, #14161:
URL: https://github.com/apache/kafka/pull/14161

   # TODO
   I need to find where the unit/integration tests for this behaviour are in 
order to add the manual test to them
   
   ### Summary
   
   The purpose of this change is to not allow a broker to start up with Tiered 
Storage disabled (`remote.log.storage.system.enable=false`) while there are 
still topics which have `remote.storage.enable` set.
   
   ### Testing
   ```
   [2023-08-07 18:01:30,654] ERROR Exiting Kafka due to fatal exception during 
startup. (kafka.Kafka$)
   org.apache.kafka.common.config.ConfigException: You have to delete all 
topics with the property remote.storage.enable (i.e. mrfreeze) before disabling 
tiered storage cluster-wide
at 
kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:71)
at 
kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:175)
at 
kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:174)
at scala.collection.immutable.Map$Map4.foreach(Map.scala:569)
at 
kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:174)
at 
kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:165)
at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:165)
at kafka.server.KafkaServer.startup(KafkaServer.scala:571)
at kafka.Kafka$.main(Kafka.scala:113)
at kafka.Kafka.main(Kafka.scala)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751747#comment-17751747
 ] 

Chris Egerton commented on KAFKA-15310:
---

Hey [~romsouza], thanks for filing this ticket! Since the proposed changes 
affect public interface (i.e., they add a configuration property to part of the 
project), a KIP is required. You can read up on the KIP process 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals].
 Thanks again for your interest in contributing to Kafka!

> Add timezone configuration option in TimestampConverter from connectors
> ---
>
> Key: KAFKA-15310
> URL: https://issues.apache.org/jira/browse/KAFKA-15310
> Project: Kafka
>  Issue Type: New Feature
>  Components: config, connect
>Reporter: Romulo Souza
>Priority: Minor
>  Labels: needs-kip
> Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
> tela de 2023-08-05 09-44-25-1.png
>
>
> In some cenarios where the use of TimestampConverter happens, it's 
> interesting to have an option to determine a specific timezone other than UTC 
> (hardcoded). E.g., there are use cases where a sink connector sends data to a 
> database and this same data is used in analysis tool without formatting and 
> transformation options.
> It should be added a new Kafka Connector's optional configuration to set the 
> desired timezone with a fallback to UTC when not informed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15310) Add timezone configuration option in TimestampConverter from connectors

2023-08-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15310:
--
Labels: needs-kip  (was: )

> Add timezone configuration option in TimestampConverter from connectors
> ---
>
> Key: KAFKA-15310
> URL: https://issues.apache.org/jira/browse/KAFKA-15310
> Project: Kafka
>  Issue Type: New Feature
>  Components: config, connect
>Reporter: Romulo Souza
>Priority: Minor
>  Labels: needs-kip
> Attachments: Captura de tela de 2023-08-05 09-43-54-1.png, Captura de 
> tela de 2023-08-05 09-44-25-1.png
>
>
> In some cenarios where the use of TimestampConverter happens, it's 
> interesting to have an option to determine a specific timezone other than UTC 
> (hardcoded). E.g., there are use cases where a sink connector sends data to a 
> database and this same data is used in analysis tool without formatting and 
> transformation options.
> It should be added a new Kafka Connector's optional configuration to set the 
> desired timezone with a fallback to UTC when not informed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #14156: KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced

2023-08-07 Thread via GitHub


gharris1727 commented on PR #14156:
URL: https://github.com/apache/kafka/pull/14156#issuecomment-1668233376

   @monish-byte This specific PR thread isn't the right place to discuss this, 
so please in the future post your general questions on the dev mailing list.
   
   Your next steps should be:
   > ask [on the mailing list] for access to the JIRA and Confluence. From 
there, you can ask more questions [on the mailing list] about contributing, 
submit your own issues, and work on issues that have already been reported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14158: KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled

2023-08-07 Thread via GitHub


C0urante commented on PR #14158:
URL: https://github.com/apache/kafka/pull/14158#issuecomment-1668229509

   I've been scratching my head over this one for a bit. One one hand, it's 
nice to allow heavily-filtered source connectors to record progress (and this 
was a suggestion I made to address part of the motivation for 
[KIP-910](https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records))
 so that there are fewer duplicates if one is restarted.
   
   However, the current behavior when exactly-once support is disabled also has 
some benefits. Right now it's possible to write an SMT that does batching of 
many source records into a single Kafka record.
   
   I'm also curious--what's the behavior with sink connectors when records are 
filtered via SMT? Does this vary depending on whether the connector's task 
class overrides the `SinkTask::preCommit` method?
   
   @vamossagar12 Ultimately I agree that some work probably has to be done 
around this logic, and thanks for identifying the discrepancy. I'm just not 
certain that the decision I made to commit offsets for dropped records when 
working on exactly-once source connectors was the correct one, and think we 
should at least consider reverting that change in behavior rather than updating 
other, longer-existing modes to align with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-07 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751735#comment-17751735
 ] 

Greg Harris edited comment on KAFKA-14598 at 8/7/23 4:33 PM:
-

[~ashwinpankaj] The PR for this is a one line fix, and you already proved that 
it was fixing a flakey failure. I don't think this should be closed as the 
problem has not been addressed.

If you no longer wish to work on this, leave it open and unassigned.


was (Author: gharris1727):
[~ashwinpankaj] The PR for this is a one line fix, and you already proved that 
it was fixing a flakey failure. I don't think this should be closed as the 
problem has not been addressed.

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ashwin Pankaj
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-07 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751735#comment-17751735
 ] 

Greg Harris commented on KAFKA-14598:
-

[~ashwinpankaj] The PR for this is a one line fix, and you already proved that 
it was fixing a flakey failure. I don't think this should be closed as the 
problem has not been addressed.

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ashwin Pankaj
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bachmanity1 commented on pull request #14153: KAFKA-14132: Replace Easymock & Powermock with Mockito in KafkaBasedLogTest

2023-08-07 Thread via GitHub


bachmanity1 commented on PR #14153:
URL: https://github.com/apache/kafka/pull/14153#issuecomment-1668221501

   Hi @divijvaidya
   
   Thanks for letting me know about ticket KAFKA-14132. It turns out that 
@mdedetrich has already begun working on this issue and has submitted a PR, but 
he said he doesn't have enough time to complete it and agreed to close it. 
Thus, I think we can handle this issue in this PR. When you have time could you 
please review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration

2023-08-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15311:
-
Summary: Fix docs about reverting to ZooKeeper mode during KRaft migration  
(was: Docs incorrectly state that reverting to ZooKeeper mode during KRaft 
migration is not possible)

> Fix docs about reverting to ZooKeeper mode during KRaft migration
> -
>
> Key: KAFKA-15311
> URL: https://issues.apache.org/jira/browse/KAFKA-15311
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Minor
>
> The cocs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible

2023-08-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15311:
-
Description: The cocs incorrectly state that reverting to ZooKeeper mode 
during KRaft migration is not possible

> Docs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible
> --
>
> Key: KAFKA-15311
> URL: https://issues.apache.org/jira/browse/KAFKA-15311
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Minor
>
> The cocs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15311) Fix docs about reverting to ZooKeeper mode during KRaft migration

2023-08-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15311:


Assignee: Colin McCabe

> Fix docs about reverting to ZooKeeper mode during KRaft migration
> -
>
> Key: KAFKA-15311
> URL: https://issues.apache.org/jira/browse/KAFKA-15311
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Minor
>
> The cocs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe opened a new pull request, #14160: KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration

2023-08-07 Thread via GitHub


cmccabe opened a new pull request, #14160:
URL: https://github.com/apache/kafka/pull/14160

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during the migration is not possible

2023-08-07 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15311:


 Summary: Docs incorrectly state that reverting to ZooKeeper mode 
during the migration is not possible
 Key: KAFKA-15311
 URL: https://issues.apache.org/jira/browse/KAFKA-15311
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15311) Docs incorrectly state that reverting to ZooKeeper mode during KRaft migration is not possible

2023-08-07 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15311:
-
Summary: Docs incorrectly state that reverting to ZooKeeper mode during 
KRaft migration is not possible  (was: Docs incorrectly state that reverting to 
ZooKeeper mode during the migration is not possible)

> Docs incorrectly state that reverting to ZooKeeper mode during KRaft 
> migration is not possible
> --
>
> Key: KAFKA-15311
> URL: https://issues.apache.org/jira/browse/KAFKA-15311
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1286093427


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java:
##
@@ -16,24 +16,19 @@
  */
 package org.apache.kafka.connect.transforms;
 
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.source.SourceRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TimestampRouterTest {
-private final TimestampRouter xform = new 
TimestampRouter<>();

Review Comment:
   This should probably be paired with a BeforeEach instead of duplicating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14159: Kafka 15291 connect plugins implement versioned

2023-08-07 Thread via GitHub


gharris1727 commented on code in PR #14159:
URL: https://github.com/apache/kafka/pull/14159#discussion_r1286093427


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java:
##
@@ -16,24 +16,19 @@
  */
 package org.apache.kafka.connect.transforms;
 
+import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.source.SourceRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TimestampRouterTest {
-private final TimestampRouter xform = new 
TimestampRouter<>();

Review Comment:
   This should probably be paired with a @BeforeEach instead of duplicating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-07 Thread via GitHub


lucasbru commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1286114522


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   Hmm, that's a good point! Not sure if I'm worried about getting back null 
(we still need the check even with a condition variable), but it'd be good to 
avoid busy waiting. But we still want to shut-down cleanly, so we still want to 
check isRunning and isPaused as well...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14001: Kafka Streams Threading: Punctuation (5/N)

2023-08-07 Thread via GitHub


lucasbru commented on code in PR #14001:
URL: https://github.com/apache/kafka/pull/14001#discussion_r1286114522


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java:
##
@@ -86,12 +87,29 @@ private void runOnce(final long nowMs) {
 
 if (currentTask == null) {
 currentTask = 
taskManager.assignNextTask(DefaultTaskExecutor.this);
-} else {
-// if a task is no longer processable, ask task-manager to 
give it another
-// task in the next iteration
-if (currentTask.isProcessable(nowMs)) {
+}
+
+if (currentTask != null) {

Review Comment:
   Hmm, that's a good point! Not sure if I'm worried about getting back null 
(we still need the check even with a condition variable), but it'd be good to 
avoid busy waiting. But we still want to shut-down cleanly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14158: KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled

2023-08-07 Thread via GitHub


vamossagar12 commented on PR #14158:
URL: https://github.com/apache/kafka/pull/14158#issuecomment-1668174595

   Yash, Greg I have marked you as reviewers for this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14421) OffsetFetchRequest throws NPE Exception

2023-08-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751726#comment-17751726
 ] 

Philip Nee commented on KAFKA-14421:


[~showuon] - should we close this issue as it is not supported.

> OffsetFetchRequest throws NPE Exception
> ---
>
> Key: KAFKA-14421
> URL: https://issues.apache.org/jira/browse/KAFKA-14421
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: yws
>Assignee: yws
>Priority: Major
> Attachments: image-2022-11-27-22-28-52-165.png, 
> image-2022-11-27-22-41-45-358.png
>
>
> when I use 0.10.2 client  send Metadata request to  0.10.0 server,  NPE 
> exception happens,
>  !image-2022-11-27-22-28-52-165.png! 
> the NPE exception quite confused me,  because if  just send Metadata request 
> doest not cause the NPE exception occurs, after troubleshooting the problem, 
> It is the NetworkClient#poll call  ConsumerNetworkClient#trySend  and further 
> call NetworkClient#doSendwhen trying to build OffsetFetchRequest, because 
> the 0.10.0 server doest not support  fetch all TopicPartitions, it throw 
> UnsupportedVersionException, 
> {code:java}
> private void doSend(ClientRequest clientRequest, boolean isInternalRequest, 
> long now) {
> String nodeId = clientRequest.destination();
> ..
> AbstractRequest request = null;
> AbstractRequest.Builder builder = clientRequest.requestBuilder();
> try {
> NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
> // Note: if versionInfo is null, we have no server version 
> information. This would be
> // the case when sending the initial ApiVersionRequest which 
> fetches the version
> // information itself.  It is also the case when 
> discoverBrokerVersions is set to false.
> if (versionInfo == null) {
> if (discoverBrokerVersions && log.isTraceEnabled())
> log.trace("No version information found when sending 
> message of type {} to node {}. " +
> "Assuming version {}.", clientRequest.apiKey(), 
> nodeId, builder.version());
> } else {
> short version = 
> versionInfo.usableVersion(clientRequest.apiKey());
> builder.setVersion(version);
> }
> // The call to build may also throw UnsupportedVersionException, 
> if there are essential
> // fields that cannot be represented in the chosen version.
> request = builder.build();
> } catch (UnsupportedVersionException e) {
> // If the version is not supported, skip sending the request over 
> the wire.
> // Instead, simply add it to the local queue of aborted requests.
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new 
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(), 
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> {code}
>  !image-2022-11-27-22-41-45-358.png! 
> until now, all are expected, but unfortunately, in catch 
> UnsupportedVersionException code block, clientRequest.toString need to call 
> requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when 
> partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the 
> unexpected NPE, and make the normal MetadataRequest failed..  
> {code:java}
> catch (UnsupportedVersionException e) {
>  
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new 
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(), 
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> ClientRequest#toString()
>public String toString() {
> return "ClientRequest(expectResponse=" + expectResponse +
> ", callback=" + callback +
> ", destination=" + destination +
> ", correlationId=" + correlationId +
> ", clientId=" + clientId +
> ", createdTimeMs=" + createdTimeMs +
> ", requestBuilder=" + requestBuilder +
> ")";
> }
>   OffsetFetchRequest's Builder#toString
> public String toString() {
> StringBuilder bld = n

[GitHub] [kafka] kamalcph commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286092708


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##
@@ -104,6 +106,22 @@ public class RemoteLogMetadataCache {
 // https://issues.apache.org/jira/browse/KAFKA-12641
 protected final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();
 
+private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+public void markInitialized() {
+initializedLatch.countDown();
+}
+
+public void ensureInitialized() throws InterruptedException {
+if (!initializedLatch.await(2, TimeUnit.MINUTES)) {

Review Comment:
   We can reduce the timeout to 10 seconds but should think of a solution for 
Admin#listOffsets call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286091923


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);
+

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286090883


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+private final int numMetadataTopicPartitions = 5;
+private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+private final DummyEventHandler handler = new DummyEventHandler();
+private final Set remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+.map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+private final Uuid topicId = Uuid.randomUuid();
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+private ConsumerTask consumerTask;
+private MockConsumer consumer;
+private Thread thread;
+
+@BeforeEach
+public void beforeEach() {
+final Map offsets = remoteLogPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), e -> 0L));
+consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+consumer.updateBeginningOffsets(offsets);
+ConsumerTask.pollIntervalMs = 10L;
+consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+thread = new Thread(consumerTask);
+}
+
+@AfterEach
+public void afterEach() throws InterruptedException {
+if (thread != null) {
+consumerTask.close();
+thread.join();
+}
+}
+
+@Test
+public void testCloseOnNoAssignment() throws InterruptedException {
+thread.start();
+Thread.sleep(10);

Revi

[GitHub] [kafka] abhijeetk88 commented on a diff in pull request #14127: KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning partitions

2023-08-07 Thread via GitHub


abhijeetk88 commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1286089506


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
 private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-private static final long POLL_INTERVAL_MS = 100L;
+static long pollIntervalMs = 100L;
 
 private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-private final KafkaConsumer consumer;
-private final String metadataTopicName;
+private final Consumer consumer;
 private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
 private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-private final Time time;
+private final Time time = new SystemTime();
 
+// TODO - Update comments below
 // It indicates whether the closing process has been started or not. If it 
is set as true,
 // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-private volatile boolean closing = false;
-
+private volatile boolean isClosed = false;
 // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
 // determined that the consumer needs to be assigned with the updated 
partitions.
-private volatile boolean assignPartitions = false;
+private volatile boolean isAssignmentChanged = true;
 
 // It represents a lock for any operations related to the 
assignedTopicPartitions.
 private final Object assignPartitionsLock = new Object();
 
 // Remote log metadata topic partitions that consumer is assigned to.
-private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+private volatile Set assignedMetadataPartitions = 
Collections.emptySet();
 
 // User topic partitions that this broker is a leader/follower for.
-private Set assignedTopicPartitions = 
Collections.emptySet();
+private volatile Map 
assignedUserTopicIdPartitions = Collections.emptyMap();
+private volatile Set 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-// may or may not have been processed based on the assigned topic 
partitions.
-private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+private long uninitializedAt = time.milliseconds();
+private boolean isAllUserTopicPartitionsInitialized;
 
-// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+private final Map readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-private final long committedOffsetSyncIntervalMs;
-private CommittedOffsetsFile committedOffsetsFile;
-private long lastSyncedTimeMs;
+private Map 
offsetHolderByMetadataPartition = new HashMap<>();
+private boolean isOffsetsFetchFailed = false;
+private long lastFailedFetchOffsetsTimestamp;
 
-public ConsumerTask(KafkaConsumer consumer,
-String metadataTopicName,
-RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
 RemoteLogMetadataTopicPartitioner topicPartitioner,
-Path committedOffsetsPath,
-Time time,
-long committedOffsetSyncIntervalMs) {
-this.consumer = Objects.requireNonNull(consumer);
-this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+Function, Consumer> 
consumerSupplier) {
+this.consumer = consumerSupplier.apply(Optional.empty());
 this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
 this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-this.time = Objects.requireNonNull(time);
-this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-initializeConsumerAssignment(committedOffsetsPath);
-}
-
-private void initializeConsumerAssignment(Path committedOffsetsPath) {
-try {
-committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-} catch (IOException e) {
-throw new KafkaException(e);
-}
-
-Map committedOffsets = Collections.emptyMap();
-try {
-// Load committed offset a

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074


##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back since this change is unrelated to 
this PR:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back since the change is unrelated to this 
PR:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on PR #14116:
URL: https://github.com/apache/kafka/pull/14116#issuecomment-1668144105

   > This leads to a situation where the tests we add will not be runnable with 
kraft/zk mode easily.
   
   We are not using ZooKeeper client directly anywhere so Kraft mode should 
work as expected.
   
   > My main concern is that we are using IntegrationTestHarness but 
duplicating a lot of it's code in our own implementation in TieredStorageContext
   
   Agree, there are some code duplications but it gives us flexibility to reuse 
the client which avoids log pollution while debugging a failed test.
   
   >  unnecessary specs and actions
   
   Plan is to add respective tests such as ReassignReplicaShrinkTest, 
ReassignReplicaExpandTest, ReassignReplicaMoveTest, PartitionsExpandTest, 
DeleteSegmentsByRetentionTimeTest, ListOffsetsTest and so on once the build 
becomes stable. We can remove the unused actions if required.
   
   Thanks for reviewing the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1"

[GitHub] [kafka] vamossagar12 commented on pull request #14000: IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments

2023-08-07 Thread via GitHub


vamossagar12 commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1668141669

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286074651


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1"

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1"

  1   2   >