Re: [PR] KAFKA-16724: Added support for fractional throughput and monotonic payload in kafka-producer-perf-test.sh [kafka]

2024-06-10 Thread via GitHub


omkreddy merged PR #16238:
URL: https://github.com/apache/kafka/pull/16238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `tools` and `tools-api` module [kafka]

2024-06-10 Thread via GitHub


gongxuanzhang commented on PR #16262:
URL: https://github.com/apache/kafka/pull/16262#issuecomment-2159837707

   @chia7712  plz take a look,think you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to `tools` and `tools-api` module [kafka]

2024-06-10 Thread via GitHub


gongxuanzhang commented on PR #16262:
URL: https://github.com/apache/kafka/pull/16262#issuecomment-2159836214

   # Failed test
   
   ##  JDK 21 and Scala 2.13
   | test | jira |
   |---|---|
   | testOutdatedCoordinatorAssignment | 
https://issues.apache.org/jira/browse/KAFKA-15900 |
   | testOffsetTranslationBehindReplicationFlow | 
https://issues.apache.org/jira/browse/KAFKA-15197 |
   | testTaskRequestWithOldStartMsGetsUpdated | 
https://issues.apache.org/jira/browse/KAFKA-15760 |
   
   ##  JDK 8 and Scala 2.12
   | test | jira |
   |---|---|
   | testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin | 
https://issues.apache.org/jira/browse/KAFKA-14453 |
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically [kafka]

2024-06-10 Thread via GitHub


kamalcph commented on PR #16203:
URL: https://github.com/apache/kafka/pull/16203#issuecomment-2159792782

   > @kamalcph Can you raise a PR against 3.8 branch as I would like those 
changes to be reviewed in PR as it has conflicts with that branch?
   
   Opened #16275 and #16276 to cherry-pick the KAFKA-15776 feature to the 3.8 
branch. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) [kafka]

2024-06-10 Thread via GitHub


kamalcph commented on PR #16276:
URL: https://github.com/apache/kafka/pull/16276#issuecomment-2159792056

   This PR is built on top of #16275 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) [kafka]

2024-06-10 Thread via GitHub


kamalcph opened a new pull request, #16276:
URL: https://github.com/apache/kafka/pull/16276

   Reviewers: Satish Duggana , Luke Chen 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778) [kafka]

2024-06-10 Thread via GitHub


kamalcph opened a new pull request, #16275:
URL: https://github.com/apache/kafka/pull/16275

   KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure 
DelayedRemoteFetch timeout
   
   Reviewers: Luke Chen 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-10 Thread via GitHub


m1a2st commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1634156488


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+this.index.close();

Review Comment:
   I think that index need to check NPE, I move it into the if statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-10 Thread via GitHub


m1a2st commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1634155239


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.security.NoSuchAlgorithmException;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMapTest {
+
+private static final int MEMORY_SIZE = 4096;
+
+@ParameterizedTest
+@ValueSource(ints = {10, 100, 1000, 5000})
+public void testBasicValidation(int items) throws NoSuchAlgorithmException 
{
+validateMap(items);
+}
+
+@Test
+public void testClear() throws NoSuchAlgorithmException {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+IntStream.range(0, 10).forEach(i -> assertDoesNotThrow(() -> 
map.put(key(i), i)));
+IntStream.range(0, 10).forEach(i -> {
+try {
+assertEquals(i, map.get(key(i)));
+} catch (DigestException e) {
+throw new RuntimeException(e);
+}
+});
+map.clear();
+IntStream.range(0, 10).forEach(i -> {
+try {
+assertEquals(-1, map.get(key(i)));
+} catch (DigestException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+@Test
+public void testGetWhenFull() throws Exception {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+int i = 37;
+while (map.size() < map.slots()) {
+map.put(key(i), i);
+i = i + 1;
+}
+assertEquals(map.get(key(i)), -1);
+assertEquals(map.get(key(i - 1)), i - 1);
+}
+
+@Test
+public void testUpdateLatestOffset() throws Exception {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+int i = 37;
+while (map.size() < map.slots()) {
+map.put(key(i), i);
+i = i + 1;
+}
+int lastOffsets = 40;
+assertEquals(map.get(key(i - 1)), i - 1);
+map.updateLatestOffset(lastOffsets);
+assertEquals(map.get(key(lastOffsets)), lastOffsets);
+}
+
+@Test
+public void testLatestOffset() throws Exception {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+int i = 37;
+while (map.size() < map.slots()) {
+map.put(key(i), i);
+i = i + 1;
+}
+assertEquals(map.latestOffset(), i - 1);
+}
+
+private ByteBuffer key(Integer key) {
+return ByteBuffer.wrap(key.toString().getBytes());
+}
+
+private void validateMap(int items) throws NoSuchAlgorithmException {

Review Comment:
   I will merge this method into `testBasicValidation`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs [kafka]

2024-06-10 Thread via GitHub


gharris1727 commented on PR #16273:
URL: https://github.com/apache/kafka/pull/16273#issuecomment-2159746582

   I still see some remaining flakiness in this test.
   
   It fails ~10% of the time at 30% CPU, rising to ~70% of the time at 15% CPU.
   The failures are mostly this one:
   ```
   caught: org.apache.kafka.connect.errors.DataException: Insufficient records 
committed by connector simple-connector in 300 millis. Records expected=8, 
actual=0
   at 
org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:213)
   at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testReconfigureConnectorWithFailingTaskConfigs(ConnectWorkerIntegrationTest.java:1292)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   ```
   with a handful of these two:
   ```
   caught: java.lang.AssertionError: Connector tasks were not restarted in time
   at org.junit.Assert.fail(Assert.java:89)
   at org.junit.Assert.assertTrue(Assert.java:42)
   at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testReconfigureConnectorWithFailingTaskConfigs(ConnectWorkerIntegrationTest.java:1310)
   ```
   ```
   caught: org.apache.kafka.connect.errors.DataException: Insufficient records 
committed by connector simple-connector in 300 millis. Records expected=1, 
actual=0
   at 
org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:213)
   at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testReconfigureConnectorWithFailingTaskConfigs(ConnectWorkerIntegrationTest.java:1317)
   ```
   
   I'll look into this more tomorrow if you need some more info.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-10 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1634144659


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,155 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16116:
URL: https://github.com/apache/kafka/pull/16116#discussion_r1634135771


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.compress.Lz4Compression;
+import org.apache.kafka.common.compress.ZstdCompression;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfigs;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfigs;
+import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.QuorumConfig;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersionValidator;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
+import org.apache.kafka.storage.internals.log.LogConfig;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
+import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+/**
+ * During moving {@link kafka.server.KafkaConfig} out of core we will have 2 
KafkaConfig.
+ * org.apache.kafka.server.config.KafkaConfig will be the future KafkaConfig 
so any new getters, or updates for `CONFIG_DEF` will be defined here.
+ * Any code depends on kafka.server.KafkaConfig will keep for using 
kafka.server.KafkaConfig for the time being until we move it out of core
+ * For more details check KAFKA-15853
+ */
+abstract public class KafkaConfig extends AbstractConfig {

Review Comment:
   It seems `public abstract class` is used more frequent than `abstract public 
class`. Maybe we can align it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16897: Move OffsetIndexTest and OffsetMapTest to storage module [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16244:
URL: https://github.com/apache/kafka/pull/16244#discussion_r1634131546


##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java:
##
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+private OffsetIndex index;
+private static final long BASE_OFFSET = 45L;
+
+@BeforeEach
+public void setup() throws IOException {
+index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+}
+
+@AfterEach
+public void tearDown() throws IOException {
+this.index.close();

Review Comment:
   As we have null check, this method can produce NPE, right? 



##
storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.security.NoSuchAlgorithmException;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMapTest {
+
+private static final int MEMORY_SIZE = 4096;
+
+@ParameterizedTest
+@ValueSource(ints = {10, 100, 1000, 5000})
+public void testBasicValidation(int items) throws NoSuchAlgorithmException 
{
+validateMap(items);
+}
+
+@Test
+public void testClear() throws NoSuchAlgorithmException {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+IntStream.range(0, 10).forEach(i -> assertDoesNotThrow(() -> 
map.put(key(i), i)));
+IntStream.range(0, 10).forEach(i -> {
+try {
+assertEquals(i, map.get(key(i)));
+} catch (DigestException e) {
+throw new RuntimeException(e);
+}
+});
+map.clear();
+IntStream.range(0, 10).forEach(i -> {
+try {
+assertEquals(-1, map.get(key(i)));
+} catch (DigestException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+@Test
+public void testGetWhenFull() throws Exception {
+SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+int i = 37;
+while (map.size() < map.slots()) {
+map.put(key(i), i);
+i = i + 1;
+}
+assertEquals(map.get(key(i)), -1);
+

Re: [PR] MINOR: migrate DescribeConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #15908:
URL: https://github.com/apache/kafka/pull/15908#discussion_r1634130364


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -16,833 +16,962 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.ConsumerGroupState;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.test.TestUtils.RANDOM;
-import static 
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
-public class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DescribeConsumerGroupTest {
+private static final String TOPIC_PREFIX = "test.topic.";
+private static final String GROUP_PREFIX = "test.group.";
 private static final List> DESCRIBE_TYPE_OFFSETS = 
Arrays.asList(Collections.singletonList(""), 
Collections.singletonList("--offsets"));
 private static final List> DESCRIBE_TYPE_MEMBERS = 
Arrays.asList(Collections.singletonList("--members"), 
Arrays.asList("--members", "--verbose"));
 private static final List> DESCRIBE_TYPE_STATE = 
Collections.singletonList(Collections.singletonList("--state"));
-private static final List> DESCRIBE_TYPES;
+private static final List> DESCRIBE_TYPES = 
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, 
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).collect(Collectors.toList());
+private final ClusterInstance clusterInstance;
 
-static {
-List> describeTypes = new ArrayList<>();
-
-describeTypes.addAll(DESCRIBE_TYPE_OFFSETS);
-describeTypes.addAll(DESCRIBE_TYPE_MEMBERS);
-describeTypes.addAll(DESCRIBE_TYPE_STATE);
+DescribeConsumerGroupTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
 
-DESCRIBE_TYPES = describeTypes;
+private static List generator() {
+return ConsumerGroupCommandTestUtils.generator();
 }
 
-@ParameterizedTest(name = 
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
-@MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
-public void testDescribeNonExistingGroup(String quorum, String 
groupProtocol) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTemplate("generator")
+public void testDescribeNonExistingGroup() {
 String missingGroup = "missing.group";
 
 for (List describeType : DESCRIBE_TYPES) {
 // note the group to be queried is a different (non-existing) group

[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853873#comment-17853873
 ] 

Chia-Ping Tsai commented on KAFKA-16929:


{quote}

Therefore, we can try to add some code checks and disable one of hamcrest
and junit

{quote}

Sorry that I misunderstand your point. Ya, you are right. Other modules should 
be disabled to use hamcrest/junit if we decide to have kafka-specific 
assertions.

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853874#comment-17853874
 ] 

Chia-Ping Tsai commented on KAFKA-16929:


{quote}

We could consider something like 
[https://docs.openrewrite.org/recipes/java/testing/assertj/junittoassertj] if 
we think assertj (or hamcrest) is better than the built-in assertions.

{quote}

I will take a look, thanks for this information

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853863#comment-17853863
 ] 

dujian0068 commented on KAFKA-16929:


I agree  you point that need  unify the "usage" 。
but  i  think that Kafka's own assertions cannot achieve this goal, and
someone will always use junit and hamcrest in the future, unless they are
disabled or add some code checks to prevent assertion classes from being
used in junit and hamcrest。

Therefore, we can try to add some code checks and disable one of hamcrest
and junit

Chia-Ping Tsai (Jira)  于2024年6月11日周二 09:42写道:



> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-10 Thread via GitHub


kirktrue commented on code in PR #16241:
URL: https://github.com/apache/kafka/pull/16241#discussion_r1634066822


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -514,10 +514,11 @@ private void fetchOffsetsWithRetries(final 
OffsetFetchRequestState fetchRequest,
 
 // Retry the same fetch request while it fails with RetriableException 
and the retry timeout hasn't expired.
 currentResult.whenComplete((res, error) -> {
-boolean inflightRemoved = 
pendingRequests.inflightOffsetFetches.remove(fetchRequest);
-if (!inflightRemoved) {
-log.warn("A duplicated, inflight, request was identified, but 
unable to find it in the " +
-"outbound buffer:" + fetchRequest);
+if (!fetchRequest.isExpired()) {
+boolean inflightRemoved = 
pendingRequests.inflightOffsetFetches.remove(fetchRequest);
+if (!inflightRemoved) {
+log.warn("The response for the offset fetch request for 
partitions {} was not found in the inflight buffer", 
fetchRequest.requestedPartitions);
+}

Review Comment:
   It is removed later, when the user calls `addOffsetFetchRequest()` with a 
set of partitions that doesn't match any in the unsent or inflight queues.
   
   However, this change has been reverted, but I'll double check how the new 
approach handles it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-10 Thread via GitHub


kirktrue commented on code in PR #16241:
URL: https://github.com/apache/kafka/pull/16241#discussion_r1634062217


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1132,12 +1133,26 @@ private CompletableFuture> addOffsetFetch
 Optional inflight =
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
-if (dupe.isPresent() || inflight.isPresent()) {
+if (dupe.isPresent()) {
 log.debug("Duplicated unsent offset fetch request found for 
partitions: {}", request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+dupe.get().chainFuture(request.future);
+} else if (inflight.isPresent()) {
+log.debug("Duplicated inflight offset fetch request found for 
partitions: {}", request.requestedPartitions);
+OffsetFetchRequestState existing = inflight.get();
+existing.chainFuture(request.future);
+
+if (existing.future.isDone()) {
+boolean inflightRemoved = 
inflightOffsetFetches.remove(existing);
+if (!inflightRemoved)
+log.warn("The offset fetch request for partitions {} 
was not found in the inflight buffer", request.requestedPartitions);
+}
 } else {
 log.debug("Enqueuing offset fetch request for partitions: {}", 
request.requestedPartitions);
 this.unsentOffsetFetches.add(request);
+
+// The incoming offset fetch request isn't in the unsent or 
inflight buffers, which means we don't
+// need to keep track of the entry in the inflight buffer any 
longer.
+inflightOffsetFetches.removeIf(r -> request.isExpired());

Review Comment:
   臘‍♂️



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively [kafka]

2024-06-10 Thread via GitHub


kirktrue commented on code in PR #16241:
URL: https://github.com/apache/kafka/pull/16241#discussion_r1634061316


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1132,12 +1133,26 @@ private CompletableFuture> addOffsetFetch
 Optional inflight =
 inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
-if (dupe.isPresent() || inflight.isPresent()) {
+if (dupe.isPresent()) {
 log.debug("Duplicated unsent offset fetch request found for 
partitions: {}", request.requestedPartitions);
-dupe.orElseGet(inflight::get).chainFuture(request.future);
+dupe.get().chainFuture(request.future);
+} else if (inflight.isPresent()) {

Review Comment:
   @AndrewJSchofield—I totally agree 
   
   The initial pass was my attempt to shoehorn in a fix while disturbing the 
surrounding code as little as possible. I wanted to keep things as localized as 
possible, given the deadline.
   
   I am working on an approach that is hopefully more sane.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on PR #16260:
URL: https://github.com/apache/kafka/pull/16260#issuecomment-2159632052

   > That said, slf4j 2.0 moves to a different approach that seems much simpler:
   
   that is a interesting suggestion.  In order to address that, we need to 
complete following tasks:
   
   1. upgrade slf4j from 1.7.36 to 2.0.9+
   2. add a new system variable to script to define `-Dslf4j.provider` easily. 
By default we use `org.slf4j.reload4j.Reload4jServiceProvider`
   3. add slf4j backend dependencies
   
   @ijuma WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-15630) Improve documentation of offset.lag.max

2024-06-10 Thread Ganesh Sadanala (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15630 ]


Ganesh Sadanala deleted comment on KAFKA-15630:
-

was (Author: JIRAUSER305566):
[~mimaison] PR raised: [https://github.com/apache/kafka/pull/16080] Please 
review it.

> Improve documentation of offset.lag.max
> ---
>
> Key: KAFKA-15630
> URL: https://issues.apache.org/jira/browse/KAFKA-15630
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> It would be good to expand on the role of this configuration on offset 
> translation and mention that it can be set to a smaller value, or even 0, to 
> help in scenarios when records may not flow constantly.
> The documentation string is here: 
> [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853857#comment-17853857
 ] 

Chia-Ping Tsai commented on KAFKA-16929:


{quote}

Unifying the way of assertions is a valuable thing, but is it really necessary 
to develop an assertion component?

If you are developing assertions for Kafka, do you want to disable hamcrest and 
junit?

{quote}

My point was to unify the "usage" and so I prefer to reuse one of existent 
assertion tools to implement kafka assertions. That is why I said most changes 
are to replace the imports from junit/hamcrest to org.apache.kafka.Assertations

 

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853856#comment-17853856
 ] 

Ismael Juma commented on KAFKA-16929:
-

We could consider something like 
[https://docs.openrewrite.org/recipes/java/testing/assertj/junittoassertj] if 
we think assertj (or hamcrest) is better than the built-in assertions.

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853855#comment-17853855
 ] 

Ismael Juma commented on KAFKA-16929:
-

"third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
[https://github.com/apache/kafka/pull/16253#discussion_r1633406693;|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

Why don't we contribute to the upstream project and fix this for everyone?

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853851#comment-17853851
 ] 

dujian0068 commented on KAFKA-16929:


hello

Unifying the way of assertions is a valuable thing, but is it really necessary 
to develop an assertion component?

If you are developing assertions for Kafka, do you want to disable hamcrest and 
junit?

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-10 Thread via GitHub


ijuma commented on PR #16260:
URL: https://github.com/apache/kafka/pull/16260#issuecomment-2159577302

   The problem with this approach is that it forces the choice of logging 
library for users that use `tools` as a library, which is inconsistent with the 
approach we picked for the `core` module a long while back (check git blame to 
see the details).
   
   That said, slf4j 2.0 moves to a different approach that seems much simpler:
   
   > SINCE 2.0.9 You can specify the provider class explicitly via the 
"slf4j.provider" system property. This bypasses the service loader mechanism 
for finding providers and may shorten SLF4J initialization.
   
   So, perhaps we can give up all these brittle build tricks and have a strong 
dependency on the slf4j backend we care about. The user can easily override it 
via a config.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16255:
URL: https://github.com/apache/kafka/pull/16255#discussion_r1634021872


##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_NUM_PARTITIONS;
+import static 
org.apache.kafka.common.requests.CreateTopicsRequest.NO_REPLICATION_FACTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class NewTopicTest {
+
+public static final String TEST_TOPIC = "testtopic";
+public static final int NUM_PARTITIONS = 3;
+public static final short REPLICATION_FACTOR = 1;
+public static final String CLEANUP_POLICY_CONFIG_KEY = "cleanup.policy";
+public static final String CLEANUP_POLICY_CONFIG_VALUE = "compact";
+public static final List BROKER_IDS = Arrays.asList(1, 2);
+
+@Test
+public void testConstructorWithPartitionsAndReplicationFactor() {
+NewTopic topic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, 
REPLICATION_FACTOR);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NUM_PARTITIONS, topic.numPartitions());
+assertEquals(REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithOptionalValues() {
+Optional numPartitions = Optional.empty();
+Optional replicationFactor = Optional.empty();
+NewTopic topic = new NewTopic(TEST_TOPIC, numPartitions, 
replicationFactor);
+assertEquals(TEST_TOPIC, topic.name());
+assertEquals(NO_NUM_PARTITIONS, topic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, topic.replicationFactor());
+assertNull(topic.replicasAssignments());
+}
+
+@Test
+public void testConstructorWithReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+assertEquals(TEST_TOPIC, newTopic.name());
+assertEquals(NO_NUM_PARTITIONS, newTopic.numPartitions());
+assertEquals(NO_REPLICATION_FACTOR, newTopic.replicationFactor());
+assertEquals(replicasAssignments, newTopic.replicasAssignments());
+}
+
+@Test
+public void testConfigsNotNull() {
+NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, 
REPLICATION_FACTOR);
+Map configs = new HashMap<>();
+configs.put(CLEANUP_POLICY_CONFIG_KEY, CLEANUP_POLICY_CONFIG_VALUE);
+newTopic.configs(configs);
+assertEquals(configs, newTopic.configs());
+}
+
+@Test
+public void testConfigsNull() {
+NewTopic newTopic = new NewTopic(TEST_TOPIC, NUM_PARTITIONS, 
REPLICATION_FACTOR);
+assertNull(newTopic.configs());
+}
+
+@Test
+public void testUnmodifiableReplicasAssignments() {
+Map> replicasAssignments = new HashMap<>();
+replicasAssignments.put(0, BROKER_IDS);
+NewTopic newTopic = new NewTopic(TEST_TOPIC, replicasAssignments);
+Map> returnedAssignments = 
newTopic.replicasAssignments();
+
+assertThrows(UnsupportedOperationException.class, () -> {

Review Comment:
   it seems this can be simplified to 
`assertThrows(UnsupportedOperationException.class, () -> 
returnedAssignments.put(1, Arrays.asList(3, 4)));`



##
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation 

Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-10 Thread via GitHub


showuon commented on PR #16260:
URL: https://github.com/apache/kafka/pull/16260#issuecomment-2159575190

   Thanks for the review. I've merged it to make `./gradlew jar` works as 
before. We can always improve it later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16929:
---
Issue Type: Wish  (was: New Feature)

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853847#comment-17853847
 ] 

Chia-Ping Tsai commented on KAFKA-16929:


[~ijuma] thanks for your response! 

{quote}

Isn't there an existing library that does this? It seems hard to believe. 

{quote}

I don't search it yet, but another value of own assertion library is that we 
can do minimum changes to code base. BTW, I was thinking `Hamcrest` is a good 
alternative and it is flexible but most code are using junit-style and it seems 
Hamcrest for java is inactive.

{quote}

 I am not sure we should be building our own assertion library (it's one more 
thing people will have to learn) unless it's really true that there isn't 
something out there that solves this issue.

{quote}

IMO, the collection of our assertion will be equal to most common use cases in 
code base. Hence, the learning curve is gentle and most migration can be done 
by replacing the imports.

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: Wish
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add time out in assertFutureThrows [kafka]

2024-06-10 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1634018574


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +557,15 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());
+try {
+future.get(5, TimeUnit.SECONDS);
+ExecutionException exception = 
assertThrows(ExecutionException.class, future::get);
+assertInstanceOf(exceptionCauseClass, exception.getCause(),
+"Unexpected exception cause " + exception.getCause());
+return exceptionCauseClass.cast(exception.getCause());
+} catch (Exception ignored) {
+return null;
+}

Review Comment:
   I saw there's another `future.get` without timeout in L583. Please also fix 
it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add time out in assertFutureThrows [kafka]

2024-06-10 Thread via GitHub


showuon commented on code in PR #16264:
URL: https://github.com/apache/kafka/pull/16264#discussion_r1634016604


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +557,15 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());
+try {
+future.get(5, TimeUnit.SECONDS);
+ExecutionException exception = 
assertThrows(ExecutionException.class, future::get);
+assertInstanceOf(exceptionCauseClass, exception.getCause(),
+"Unexpected exception cause " + exception.getCause());
+return exceptionCauseClass.cast(exception.getCause());
+} catch (Exception ignored) {
+return null;
+}

Review Comment:
   Before this change, when future.get throws "non-ExecutionException", it'll 
fail. But after the change, it'll pass. This changes the original semantics. I 
think we can use `catch` to assert what we expected. Ex:
   
   ```
   try {
 future.get(5, sec);
 fail("expected to throw ExecutionException...");
   } catch (TimeoutException e) {
 fail("timeout waiting");
   } catch (ExecutionException e) {
 ...
   } catch ...
   
   
   ```
   
   WDYT? 
   ```



##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -556,10 +557,15 @@ public static Set 
generateRandomTopicPartitions(int numTopic, in
  * @return The caught exception cause
  */
 public static  T assertFutureThrows(Future future, 
Class exceptionCauseClass) {
-ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
-assertInstanceOf(exceptionCauseClass, exception.getCause(),
-"Unexpected exception cause " + exception.getCause());
-return exceptionCauseClass.cast(exception.getCause());
+try {
+future.get(5, TimeUnit.SECONDS);
+ExecutionException exception = 
assertThrows(ExecutionException.class, future::get);
+assertInstanceOf(exceptionCauseClass, exception.getCause(),
+"Unexpected exception cause " + exception.getCause());
+return exceptionCauseClass.cast(exception.getCause());
+} catch (Exception ignored) {
+return null;
+}

Review Comment:
   AI saw there's another `future.get` without timeout in L583. Please also fix 
it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16895: fix off-by-one bug in RemoteCopyLagSegments [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16210:
URL: https://github.com/apache/kafka/pull/16210#discussion_r163401


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1018,8 +1018,8 @@ void testRemoteLogManagerRemoteMetrics() throws Exception 
{
 safeLongYammerMetricValue("RemoteCopyLagBytes,topic=" 
+ leaderTopic),
 safeLongYammerMetricValue("RemoteCopyLagBytes")));
 TestUtils.waitForCondition(
-() -> 1 == safeLongYammerMetricValue("RemoteCopyLagSegments") 
&& 1 == safeLongYammerMetricValue("RemoteCopyLagSegments,topic=" + leaderTopic),
-String.format("Expected to find 1 for RemoteCopyLagSegments 
metric value, but found %d for topic 'Leader' and %d for all topics.",
+() -> 0 == safeLongYammerMetricValue("RemoteCopyLagSegments") 
&& 0 == safeLongYammerMetricValue("RemoteCopyLagSegments,topic=" + leaderTopic),

Review Comment:
   This is a bit weird to me, as the value of `RemoteCopyLagBytes` is not zero. 
Maybe we should change line#1003 to return `2` instead of `1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853845#comment-17853845
 ] 

Ismael Juma commented on KAFKA-16929:
-

Isn't there an existing library that does this? It seems hard to believe. I am 
not sure we should be building our own assertion library (it's one more thing 
people will have to learn) unless it's really true that there isn't something 
out there that solves this issue.

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> [https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16924) No log output when running kafka

2024-06-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16924.
---
Resolution: Fixed

> No log output when running kafka 
> -
>
> Key: KAFKA-16924
> URL: https://issues.apache.org/jira/browse/KAFKA-16924
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 4.0.0
>
>
> In [https://github.com/apache/kafka/pull/12148] , we removed log4jAppender 
> dependency, and add testImplementation dependency for `slf4jlog4j` lib. 
> However, we need this runtime dependency in tools module to output logs. 
> ([ref]([https://stackoverflow.com/a/21787813])) Adding this dependency back.
>  
> Note: The {{slf4jlog4j}} lib was added in {{log4j-appender}} dependency. 
> Since it's removed, we need to explicitly declare it.
>  
> Current output will be like this:
> {code:java}
> > ./gradlew clean jar
> > bin/kafka-server-start.sh config/kraft/controller.properties
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16924: add slf4jlog4j dependey in tool [kafka]

2024-06-10 Thread via GitHub


showuon merged PR #16260:
URL: https://github.com/apache/kafka/pull/16260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16921: Migrate all junit 4 code to junit 5 for connect module (part 1) [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16253:
URL: https://github.com/apache/kafka/pull/16253#discussion_r1634010611


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##
@@ -1508,25 +1545,25 @@ private List 
allocations(Function
 assertEquals(
-"Expected no revocations to take place during this 
round, but connector revocations were issued for worker " + worker,
 Collections.emptySet(),
-new HashSet<>(revocations)
+new HashSet<>(revocations),
+"Expected no revocations to take place during this 
round, but connector revocations were issued for worker " + worker
 )
 );
 returnedAssignments.newlyRevokedTasks().forEach((worker, revocations) 
->
 assertEquals(
-"Expected no revocations to take place during this 
round, but task revocations were issued for worker " + worker,
 Collections.emptySet(),
-new HashSet<>(revocations)

Review Comment:
   > With rudimentary checking logic (searching for 
assertEquals(Collections.empty in the code base), I also see this pattern used 
94 times across 16 files in the Connect modules, so it's not exactly uncommon. 
IMO it'd be better to make it more common in other parts of the code base 
rather than try to "simplify" things in a way that drops information on 
assertion failures.
   
   I file https://issues.apache.org/jira/browse/KAFKA-16929 to have more 
discussion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16929:
---
Description: 
There are many contributors who trying to fix chaos of kafka testing. That 
includes following huge works:
 # replace powermock/easymock by mockito (KAFKA-7438)
 # replace junit 4 assertion by junit 5 (KAFKA-7339)

We take 6 years to complete the migration for task 1. The second task is in 
progress and I hope it can be addressed in 4.0.0

When reviewing I noticed there are many different tastes in code base. That is 
why the task 1 is such difficult to rewrite. Now, the rewriting of "assertion" 
is facing the same issue, and I feel the usage of "assertion" is even more 
awkward than "mockito" due to following reason.
 # there are two "different" assertion style in code base - hamcrest and junit 
- that is confused to developers
 # 
[https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
 # third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
[https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

IMHO, we should consider having our kafka-specified assertion style. Than can 
bring following benefit.
 # unify the assertion style of whole project
 # apply customized assertion. for example:
 ## assertEqual(List, List, F))
 ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
 # auto-generate useful error message. For example: assertEqual(0, list) -> 
print the list

In short, I'd like to add a new module to define common assertions, and then 
apply it to code base slowly.

All feedback/responses/objections are welcomed :)

 

  was:
There are many contributors who trying to fix chaos of kafka testing. That 
includes following huge works:
 # replace powermock/easymock by mockito (KAFKA-7438)
 # replace junit 4 assertion by junit 5 (KAFKA-7339)

We take 6 years to complete the migration for task 1. The second task is in 
progress and I hope it can be addressed in 4.0.0

When reviewing I noticed there are many different tastes in code base. That is 
why the task 1 is such difficult to rewrite. Now, the rewriting of "assertion" 
is facing the same issue, and I feel the usage of "assertion" is even more 
awkward than "mockito" due to following reason.
 # there are two "different" assertion style in code base - hamcrest and junit 
- that is confused to developers 
([https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
 # third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
([https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

IMHO, we should consider having our kafka-specified assertion style. Than can 
bring following benefit.
 # unify the assertion style of whole project
 # apply customized assertion. for example:
 ## assertEqual(List, List, F))
 ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
 # auto-generate useful error message. For example: assertEqual(0, list) -> 
print the list

In short, I'd like to add a new module to define common assertions, and then 
apply it to code base slowly.

All feedback/responses/objections are welcomed :)

 


> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers
>  # 
> [https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need 

[jira] [Updated] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16929:
---
Description: 
There are many contributors who trying to fix chaos of kafka testing. That 
includes following huge works:
 # replace powermock/easymock by mockito (KAFKA-7438)
 # replace junit 4 assertion by junit 5 (KAFKA-7339)

We take 6 years to complete the migration for task 1. The second task is in 
progress and I hope it can be addressed in 4.0.0

When reviewing I noticed there are many different tastes in code base. That is 
why the task 1 is such difficult to rewrite. Now, the rewriting of "assertion" 
is facing the same issue, and I feel the usage of "assertion" is even more 
awkward than "mockito" due to following reason.
 # there are two "different" assertion style in code base - hamcrest and junit 
- that is confused to developers 
([https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
 # third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
([https://github.com/apache/kafka/pull/16253#discussion_r1633406693|https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

IMHO, we should consider having our kafka-specified assertion style. Than can 
bring following benefit.
 # unify the assertion style of whole project
 # apply customized assertion. for example:
 ## assertEqual(List, List, F))
 ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
 # auto-generate useful error message. For example: assertEqual(0, list) -> 
print the list

In short, I'd like to add a new module to define common assertions, and then 
apply it to code base slowly.

All feedback/responses/objections are welcomed :)

 

  was:
There are many contributors who trying to fix chaos of kafka testing. That 
includes following huge works:
 # replace powermock/easymock by mockito (KAFKA-7438)
 # replace junit 4 assertion by junit 5 (KAFKA-7339)

We take 6 years to complete the migration for task 1. The second task is in 
progress and I hope it can be addressed in 4.0.0

When reviewing I noticed there are many different tastes in code base. That is 
why the task 1 is such difficult to rewrite. Now, the rewriting of "assertion" 
is facing the same issue, and I feel the usage of "assertion" is even more 
awkward than "mockito" due to following reason.
 # there are two "different" assertion style in code base - hamcrest and junit 
- that is confused to developers 
([https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
 # third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
([https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

IMHO, we should consider having our kafka-specified assertion style. Than can 
bring following benefit.
 # unify the assertion style of whole project
 # apply customized assertion. for example:
 ## assertEqual(List, List, F))
 ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
 # auto-generate useful error message. For example: assertEqual(0, list) -> 
print the list

In short, I'd like to add a new module to define common assertions, and then 
apply it to code base slowly.

All feedback/responses/objections are welcomed :)

 


> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers 
> ([https://github.com/apache/kafka/pull/15730#discussion_r1567676845|https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> 

Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2159551660

   Thanks for review @lucasbru 
   Thank you again for taking the time to review. @philipnee @lianetm @kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1634006443


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   I got it.
   thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1634004534


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,155 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

[jira] [Commented] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853841#comment-17853841
 ] 

PoAn Yang commented on KAFKA-16929:
---

Hi [~chia7712], thanks for the good suggestion. If this issue is valid, I can 
help to contribute it. 

> Conside defining kafka-specified assertion to unify testing style
> -
>
> Key: KAFKA-16929
> URL: https://issues.apache.org/jira/browse/KAFKA-16929
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> There are many contributors who trying to fix chaos of kafka testing. That 
> includes following huge works:
>  # replace powermock/easymock by mockito (KAFKA-7438)
>  # replace junit 4 assertion by junit 5 (KAFKA-7339)
> We take 6 years to complete the migration for task 1. The second task is in 
> progress and I hope it can be addressed in 4.0.0
> When reviewing I noticed there are many different tastes in code base. That 
> is why the task 1 is such difficult to rewrite. Now, the rewriting of 
> "assertion" is facing the same issue, and I feel the usage of "assertion" is 
> even more awkward than "mockito" due to following reason.
>  # there are two "different" assertion style in code base - hamcrest and 
> junit - that is confused to developers 
> ([https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
>  # third-party assertion does not offer good error message, so we need to use 
> non-common style to get useful output 
> ([https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]
> IMHO, we should consider having our kafka-specified assertion style. Than can 
> bring following benefit.
>  # unify the assertion style of whole project
>  # apply customized assertion. for example:
>  ## assertEqual(List, List, F))
>  ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
>  # auto-generate useful error message. For example: assertEqual(0, list) -> 
> print the list
> In short, I'd like to add a new module to define common assertions, and then 
> apply it to code base slowly.
> All feedback/responses/objections are welcomed :)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16231:
URL: https://github.com/apache/kafka/pull/16231#discussion_r1633997795


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Threads(2)
+public class ConcurrentMapBenchmark {
+private static final int TIMES = 1000_000;
+
+@Param({"100"})
+private int mapSize;
+
+@Param({"0.1"})
+private double writePercentage;
+
+private Map concurrentHashMap;
+private Map copyOnWriteMap;
+private int writePerLoops;
+
+@Setup
+public void setup() {
+Map mapTemplate = IntStream.range(0, mapSize).boxed()
+.collect(Collectors.toMap(i -> i, i -> i));
+concurrentHashMap = new ConcurrentHashMap<>(mapTemplate);
+copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate);
+writePerLoops = TIMES / (int) Math.round(writePercentage * TIMES);
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(concurrentHashMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(copyOnWriteMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(copyOnWriteMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+   

[jira] [Created] (KAFKA-16929) Conside defining kafka-specified assertion to unify testing style

2024-06-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16929:
--

 Summary: Conside defining kafka-specified assertion to unify 
testing style
 Key: KAFKA-16929
 URL: https://issues.apache.org/jira/browse/KAFKA-16929
 Project: Kafka
  Issue Type: New Feature
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


There are many contributors who trying to fix chaos of kafka testing. That 
includes following huge works:
 # replace powermock/easymock by mockito (KAFKA-7438)
 # replace junit 4 assertion by junit 5 (KAFKA-7339)

We take 6 years to complete the migration for task 1. The second task is in 
progress and I hope it can be addressed in 4.0.0

When reviewing I noticed there are many different tastes in code base. That is 
why the task 1 is such difficult to rewrite. Now, the rewriting of "assertion" 
is facing the same issue, and I feel the usage of "assertion" is even more 
awkward than "mockito" due to following reason.
 # there are two "different" assertion style in code base - hamcrest and junit 
- that is confused to developers 
([https://github.com/apache/kafka/pull/15730#discussion_r1567676845)]
 # third-party assertion does not offer good error message, so we need to use 
non-common style to get useful output 
([https://github.com/apache/kafka/pull/16253#discussion_r1633406693)]

IMHO, we should consider having our kafka-specified assertion style. Than can 
bring following benefit.
 # unify the assertion style of whole project
 # apply customized assertion. for example:
 ## assertEqual(List, List, F))
 ## assertTrue(Supplier, Duration) - equal to `TestUtils.waitForCondition`
 # auto-generate useful error message. For example: assertEqual(0, list) -> 
print the list

In short, I'd like to add a new module to define common assertions, and then 
apply it to code base slowly.

All feedback/responses/objections are welcomed :)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16752: Implemented acquire functionality for Fetch (KIP-932) [kafka]

2024-06-10 Thread via GitHub


apoorvmittal10 opened a new pull request, #16274:
URL: https://github.com/apache/kafka/pull/16274

   The implementation for share-fetch `next-fetch-offset` in share partition 
and acquiring records from log.
   
   The Next Fetch Offset (NFO) determines where the Share Partition should 
initiate the next data read from the Replica Manager. While it typically aligns 
with the last offset of the most recently returned batch, last offset + 1, 
there are exceptions. Messages marked available again due to release 
acknowledgements or lock timeouts can cause the NFO to shift.
   
   The acquire method caches the batches as acquired in-memory and spawns a 
timer task for lock timeout.
   
   ### Cache
   1. `Per-offset Metadata`: Simple to implement but inefficient. Every offset 
requires in-memory storage and traversal, leading to high memory usage and 
processing overhead, especially for per-batch acknowledgements (mostly the way 
records would be acknowledged).
   
   2. `Per-Replica Fetch Batch`: This approach aligns with the Replica Manager 
fetch batches. Since a full Replica Manager batch is retrieved whenever the 
requested offset falls within that batch's boundaries, a single Share Fetch 
request will likely receive an entire Replica Manager batch. However, there's a 
trade-off. Replica Manager batches are based on producer batching. If producers 
don't batch effectively, the in-flight metadata becomes heavily reliant on the 
producer's batching behavior.
   
   For per-message acknowledgements, per-offset tracking will be necessary 
which again requires splitting in-flight batches based on state. Splitting 
bacthes is inefficient as it requires cache update wshich maintains sorted 
order. Therefore, we propose a hybrid approach:
   
   Implemented a combination of option 2 (per-in-flight batch tracking) with 
option 1 (per-offset tracking). This aligns well with Replica Manager batching.
   
   States shall be maintained per in-flight batch. If state inconsistencies 
arise within in-flight batches due to per-message acknowledgements, switch 
state tracking for the respective batch to option 1 (per-offset tracking).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16520 KIP-853 Changes to DescribeQuorum request/response [kafka]

2024-06-10 Thread via GitHub


cmccabe commented on code in PR #16106:
URL: https://github.com/apache/kafka/pull/16106#discussion_r1633905881


##
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##
@@ -29,19 +32,22 @@ public class QuorumInfo {
 private final long highWatermark;
 private final List voters;
 private final List observers;
+private final Map nodes;

Review Comment:
   It's easier for users this way. The extra int is not a major burden since 
the map is so small.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 21) UUID to ProcessId migration [kafka]

2024-06-10 Thread via GitHub


ableegoldman commented on code in PR #16269:
URL: https://github.com/apache/kafka/pull/16269#discussion_r1633870091


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -920,8 +919,8 @@ private boolean populateClientStatesMap(final Map clientState
 fetchEndOffsetsSuccessful = false;
 }
 
-for (final Map.Entry entry : 
clientMetadataMap.entrySet()) {
-final UUID uuid = entry.getKey();
+for (final Map.Entry entry : 
clientMetadataMap.entrySet()) {
+final ProcessId uuid = entry.getKey();

Review Comment:
   nit: rename variable to `processId`



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -216,23 +215,13 @@ public static Map 
optimizeRackAwareActiveTask
 .filter(taskInfo -> tasks.contains(taskInfo.id()))
 .collect(Collectors.toMap(TaskInfo::id, 
TaskInfo::topicPartitions));
 
-final Map> clientRacks = new HashMap<>();
-final List clientIds = new ArrayList<>();
-final Map assignmentsByUuid = new 
HashMap<>();
-
-for (final Map.Entry entry : 
kafkaStreamsAssignments.entrySet()) {
-final UUID uuid = entry.getKey().id();
-clientIds.add(uuid);
-clientRacks.put(uuid, 
kafkaStreamsStates.get(entry.getKey()).rackId());
-assignmentsByUuid.put(uuid, entry.getValue());
-}
-
+final List clientIds = new 
ArrayList<>(kafkaStreamsStates.keySet());

Review Comment:
   nice! so much simpler  



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -775,15 +746,15 @@ private static void 
assignStandbyTasksToClientsWithDifferentTags(final int numbe
 tagEntryToUsedClients
 );
 
-final UUID clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
-activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new 
ProcessId(uuid), tagEntryToUsedClients)
+final ProcessId clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
+activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, 
tagEntryToUsedClients)

Review Comment:
   ```suggestion
   activeTaskId, processId -> 
!isClientUsedOnAnyOfTheTagEntries(processId, tagEntryToUsedClients)
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -373,14 +352,14 @@ public static Map 
optimizeRackAwareStandbyTas
 taskMoved = false;
 round++;
 for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
-final UUID clientId1 = clientIds.get(i);
-final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+final ProcessId clientId1 = clientIds.get(i);
+final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(clientId1);
 for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
-final UUID clientId2 = clientIds.get(j);
-final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+final ProcessId clientId2 = clientIds.get(j);
+final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(clientId2);

Review Comment:
   ditto here: `clientState2` --> `clientAssignment2`



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -893,16 +864,16 @@ private static MoveStandbyTaskPredicate 
getStandbyTaskMovePredicate(final Applic
 private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(final 
Map streamStates,
 final 
Map kafkaStreamsAssignments) {
 return new ConstrainedPrioritySet(
-(processId, taskId) -> !kafkaStreamsAssignments.get(new 
ProcessId(processId)).tasks().containsKey(taskId),
+(processId, taskId) -> 
!kafkaStreamsAssignments.get(processId).tasks().containsKey(taskId),
 processId -> {
-final double capacity = streamStates.get(new 
ProcessId(processId)).numProcessingThreads();
-final double numTasks = kafkaStreamsAssignments.get(new 
ProcessId(processId)).tasks().size();
+final double capacity = 
streamStates.get(processId).numProcessingThreads();
+final double numTasks = 
kafkaStreamsAssignments.get(processId).tasks().size();
 return numTasks / capacity;
 }
 );
 }
 
-private static void assignPendingStandbyTasksToLeastLoadedClients(final 
Map clients,
+private static void 

[jira] [Commented] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-06-10 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853823#comment-17853823
 ] 

Greg Harris commented on KAFKA-16225:
-

This PR: [https://github.com/apache/kafka/pull/15335] for KAFKA-16234 appears 
to have substantially decreased the flakiness in this test suite: !Screenshot 
2024-06-10 at 2.39.54 PM.png! Thanks [~omnia_h_ibrahim] and all of the 
reviewers on that fix! Some minor flakiness remains with the following 4 errors 
in the past 28 days on trunk:
{noformat}
org.opentest4j.AssertionFailedError: Timeout waiting for controller metadata 
propagating to brokers
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
at kafka.utils.TestUtils$.ensureConsistentKRaftMetadata(TestUtils.scala:927)
at 
kafka.integration.KafkaServerTestHarness.ensureConsistentKRaftMetadata(KafkaServerTestHarness.scala:422)
at 
kafka.server.LogDirFailureTest.setUp(LogDirFailureTest.scala:60){noformat}
{noformat}
org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead 
of the expected 1 records
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:138)
at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:927)
at 
kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:200)
at 
kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:72)
{noformat}
{noformat}
java.nio.file.FileAlreadyExistsException: 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/core/data/kafka-1608887280612041858
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.createFile(Files.java:632)
at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1360)
at 
kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:191)
at 
kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnCheckpoint(LogDirFailureTest.scala:137){noformat}
{noformat}
java.nio.file.FileAlreadyExistsException: 
/home/jenkins/workspace/Kafka_kafka_trunk/core/data/kafka-13669229279446172937
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:94)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
at java.nio.file.Files.newByteChannel(Files.java:371)
at java.nio.file.Files.createFile(Files.java:648)
at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1360)
at 
kafka.server.LogDirFailureTest.testLogDirNotificationTimeout(LogDirFailureTest.scala:89)
{noformat}

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Assignee: Omnia Ibrahim
>Priority: Major
>  Labels: flaky-test
> Attachments: Screenshot 2024-06-10 at 2.39.54 PM.png
>
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> 

[jira] [Updated] (KAFKA-16225) Flaky test suite LogDirFailureTest

2024-06-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16225:

Attachment: Screenshot 2024-06-10 at 2.39.54 PM.png

> Flaky test suite LogDirFailureTest
> --
>
> Key: KAFKA-16225
> URL: https://issues.apache.org/jira/browse/KAFKA-16225
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Greg Harris
>Assignee: Omnia Ibrahim
>Priority: Major
>  Labels: flaky-test
> Attachments: Screenshot 2024-06-10 at 2.39.54 PM.png
>
>
> I see this failure on trunk and in PR builds for multiple methods in this 
> test suite:
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was:     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>     
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)    
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)    
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)    
> at kafka.utils.TestUtils$.causeLogDirFailure(TestUtils.scala:1715)    
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:186)
>     
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:70){noformat}
> It appears this assertion is failing
> [https://github.com/apache/kafka/blob/f54975c33135140351c50370282e86c49c81bbdd/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715]
> The other error which is appearing is this:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected exception type thrown, 
> expected:  but was: 
>     
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:67)    
> at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)    
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)    
> at 
> kafka.server.LogDirFailureTest.testProduceErrorsFromLogDirFailureOnLeader(LogDirFailureTest.scala:164)
>     
> at 
> kafka.server.LogDirFailureTest.testProduceErrorFromFailureOnLogRoll(LogDirFailureTest.scala:64){noformat}
> Failures appear to have started in this commit, but this does not indicate 
> that this commit is at fault: 
> [https://github.com/apache/kafka/tree/3d95a69a28c2d16e96618cfa9a1eb69180fb66ea]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 3.7.1

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14567.
-
  Assignee: (was: Kirk True)
Resolution: Fixed

Not 100% sure either, but I feel good enough to close this ticket for now. If 
we see it again, we can reopen or create a new ticket.

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> 

[jira] [Commented] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-10 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853815#comment-17853815
 ] 

Matthias J. Sax commented on KAFKA-16925:
-

Thanks for filing this ticket – it's a know problem, not limited to 
stream-table join.

I don't think that initializing with `context.currentStreamTime()` is the right 
fix though, and would in general prefer to fix this issue across the board, to 
avoid building operator specific island solutions.

> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
> streamTime = T{code}
>  
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime 
> = T - 60s{code}
>  
> The processor should use currentStreamTime from Context instead. Which is 
> recovered on restart.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 3.8.0
   (was: 4.0.0)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: (was: 3.9.0)

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 4.0.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: (was: 3.9.0)

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16838) Kafka Connect loads old tasks from removed connectors

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16838:
--
Fix Version/s: (was: 3.9.0)

> Kafka Connect loads old tasks from removed connectors
> -
>
> Key: KAFKA-16838
> URL: https://issues.apache.org/jira/browse/KAFKA-16838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Hello,
> When creating connector we faced an error from one of our ConfigProviders 
> about not existing resource, but we didn't try to set that resource as config 
> value:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)
>  {code}
> It looked like there already was connector with the same name and same 
> config, +but it wasn't.+
> After investigation we found out, that few months ago on that cloud there was 
> the connector with the same name and another value for config provider. Then 
> it was removed, but by some reason when we tried to create connector with the 
> same name months ago AbstractHerder tried to update tasks from our previous 
> connector
> As an example I used FileConfigProvider, but actually any ConfigProvider is 
> acceptable which could raise exception if something wrong with config (like 
> result doesn't exist).
> We continued our investigation and found the issue 
> https://issues.apache.org/jira/browse/KAFKA-7745 that says Connect doesn't 
> send tombstone message for *commit* and *task* records in the config topic of 
> Kafka Connect. As we remember, the config topic is `compact` *that means 
> commit and tasks are are always stored* (months, years after connector 
> removing) while tombstones for connector messages are cleaned with 
> {{delete.retention.ms}}  property. That impacts further connector creations 
> with the same name.
> We didn't investigate reasons in ConfigClusterStore and how to avoid that 
> issue, because would {+}like to ask{+}, probably it's better to fix 
> KAFKA-7745 and send tombstones for commit and task messages as connect does 
> for connector and target messages?
> In the common way the TC looks like:
>  # Create connector with config provider to resource1
>  # Remove connector
>  # Remove resouce1
>  # Wait 2-4 weeks :) (until config topic being compacted and tombstone 
> messages about config and target connector are removed)
>  # Try to create connector with the same name and config provider to resource2
> 

[jira] [Updated] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-9228:
-
Fix Version/s: 4.0.0

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 4.0.0, 3.9.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16570 FenceProducers API returns "unexpected error" when succes… [kafka]

2024-06-10 Thread via GitHub


jolshan commented on PR #16229:
URL: https://github.com/apache/kafka/pull/16229#issuecomment-2159278838

   >  producer.beginTransaction()
   try {
   producer.send(record).get()
   fail("expected ProducerFencedException") // <- test failed here

Interesting. I thought it would fence immediately but we might see some 
extra concurrent transaction errors. I'm curious if we could commit such a 
transaction in this state, but maybe this is good enough.

The only other thing I could think of is will a long blocking call be an 
issue for users of this api. Curious about @C0urante since I believe he was the 
one who implemented this for connect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9228) Reconfigured converters and clients may not be propagated to connector tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9228.
--
Fix Version/s: 3.9.0
   Resolution: Fixed

> Reconfigured converters and clients may not be propagated to connector tasks
> 
>
> Key: KAFKA-9228
> URL: https://issues.apache.org/jira/browse/KAFKA-9228
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.3.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.9.0
>
>
> If an existing connector is reconfigured but the only changes are to its 
> converters and/or Kafka clients (enabled as of 
> [KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]),
>  the changes will not propagate to its tasks unless the connector also 
> generates task configs that differ from the existing task configs. Even after 
> this point, if the connector tasks are reconfigured, they will still not pick 
> up on the new converter and/or Kafka client configs.
> This is because the {{DistributedHerder}} only writes new task configurations 
> to the connect config topic [if the connector-provided task configs differ 
> from the task configs already in the config 
> topic|https://github.com/apache/kafka/blob/e499c960e4f9cfc462f1a05a110d79ffa1c5b322/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1285-L1332],
>  and neither of those contain converter or Kafka client configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-10 Thread via GitHub


C0urante merged PR #16053:
URL: https://github.com/apache/kafka/pull/16053


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-10 Thread via GitHub


C0urante commented on PR #16053:
URL: https://github.com/apache/kafka/pull/16053#issuecomment-2159277004

   Last changes are trivial and build and unit tests passed locally, as well as 
the newly-introduced integration test. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14919 sync topic configs test [kafka]

2024-06-10 Thread via GitHub


gharris1727 commented on PR #16143:
URL: https://github.com/apache/kafka/pull/16143#issuecomment-2159265832

   Okay thank you for the clarification @anton-liauchuk . While avoiding 
confusing one operation for another by turning off all-but-one operation avoids 
the ambiguity, it still leaves the opportunity for ambiguity to appear in the 
future. If we are avoiding certain situations due to ambiguity, that reduces 
our total test coverage.
   
   Also I think `testCreatePartitionsUseProvidedForwardingAdmin` remains 
ambiguous, at least from re-reading my original comment: 
https://github.com/apache/kafka/pull/13575#issuecomment-1513825281
   
   > I had to implement `incrementalAlterConfigs`
   
   Thanks, that definitely needed to be done. FakeLocalMetadataStore should 
have the granularity to assert that we used incremental vs the legacy API, and 
it doesn't right now. That would have caught this mistake earlier when MM2 
support for the new API was added.
   
   Please refactor the FakeLocalMetadata store to make the ambiguity 
impossible, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16570 FenceProducers API returns "unexpected error" when succes… [kafka]

2024-06-10 Thread via GitHub


edoardocomar commented on PR #16229:
URL: https://github.com/apache/kafka/pull/16229#issuecomment-2159263370

   Hi @jolshan would you like any further changes ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-10 Thread via GitHub


C0urante commented on code in PR #16053:
URL: https://github.com/apache/kafka/pull/16053#discussion_r1633815726


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+
+import java.util.Map;
+
+/**
+ * Wrapper class for a connector configuration that has been used to generate 
task configurations
+ * Supports lazy {@link WorkerConfigTransformer#transform(Map) transformation}.
+ */
+public class AppliedConnectorConfig {
+
+private final Map rawConfig;
+private volatile Map transformedConfig;
+
+/**
+ * Create a new applied config that has not yet undergone
+ * {@link WorkerConfigTransformer#transform(Map) transformation}.
+ * @param rawConfig the non-transformed connector configuration; may be 
null
+ */
+public AppliedConnectorConfig(Map rawConfig) {
+this.rawConfig = rawConfig;
+}
+
+/**
+ * If necessary, {@link WorkerConfigTransformer#transform(Map) transform} 
the raw
+ * connector config, then return the result. Transformed configurations 
are cached and
+ * returned in all subsequent calls.
+ * @param configTransformer the transformer to use, if no transformed 
connector
+ *  config has been cached yet; may be null
+ * @return the possibly-cached, transformed, connector config; may be null
+ */
+public Map transformedConfig(WorkerConfigTransformer 
configTransformer) {
+if (transformedConfig != null || rawConfig == null)
+return transformedConfig;

Review Comment:
   Good point about reducing the reads of volatile fields! After thinking this 
through more, I actually don't think we need to make `transformedConfig` 
volatile since the synchronization barrier we enter if it was null at the 
beginning of the method should guarantee that we pick up any writes performed 
by other threads.
   
   That said, your comment about the "lazy" approach resonated with me. This 
isn't on a critical path for performance and it's not with the hit in 
readability and additional complexity to add fine-grained synchronization. So 
I'll just make the whole method synchronized 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs [kafka]

2024-06-10 Thread via GitHub


C0urante opened a new pull request, #16273:
URL: https://github.com/apache/kafka/pull/16273

   This test has been flaky since it was merged to trunk. To date, there have 
been 566 successful runs and 8 flaky failures (see [Gradle Enterprise 
analysis](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=kafka=America%2FNew_York=org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest=Wzhd=testReconfigureConnectorWithFailingTaskConfigs)).
   
   One possible cause of this is that we establish an expectation on the number 
of offset commits that need to take place (two per task) before reconfiguring 
the connector, but the assumption in the test is that these commits will take 
place after the tasks have been restarted. In some rare cases, it's possible 
that these commits will have already taken place before the tasks are 
restarted, which causes an assertion failure with the message 
"java.lang.AssertionError: Source connector should have published at least one 
record to new Kafka topic after being reconfigured".
   
   This patch should resolve those failures by establishing the expected number 
of offset commits _after_ the connector has been reconfigured and its tasks 
have been restarted, which should guarantee that the offset commits are 
performed by tasks with the updated connector configuration.
   
   In addition, the number of expected offset commits is reduced to one, since 
a single commit is all that we need in order to expect at least one record is 
present in the new Kafka topic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update 3.8 upgrade guide for Kafka Streams [kafka]

2024-06-10 Thread via GitHub


mjsax commented on code in PR #16265:
URL: https://github.com/apache/kafka/pull/16265#discussion_r1633793710


##
docs/streams/upgrade-guide.html:
##
@@ -134,11 +134,44 @@ <
 
 
 Streams API changes in 3.8.0
+
+
+Kafka Streams now supports customizable task assignment strategies via 
the `task.assignor.class` configuration.
+The configuration can be set to the fully qualified class name of a 
custom task assignor implementation
+that has to extend the new 
`org.apache.kafka.streams.processor.assignment.TaskAssignor` interface.
+
+The new configuration also allows users to bring back the behavior of 
the old task assignor
+`StickyTaskAssignor` that was used before the introduction 
`HighAvailabilityTaskAssignor`.
+If no custom task assignor is configured, the default task assignor 
`HighAvailabilityTaskAssignor` is used.
+
+This change also removes the internal config 
`internal.task.assignor.class` that was used for the same
+purpose. If you were using this config, you should switch to using 
`task.assignor.class` instead.
+
+For more details, see the public interface section of
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams;>KIP-924.
+
+
 
 The Processor API now support so-called read-only state stores, added 
via 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores;>KIP-813.
-   These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
-   simlar to KTables with source-topic optimization enabled.
+These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
+similar to KTables with source-topic optimization enabled.
+
+
+
+To improve detection of leaked state store iterators, we added new 
store-level metrics to track the number and
+age of open iterators. The new metrics are `num-open-iterators`, 
`iterator-duration-avg`, `iterator-duration-max`
+and `oldest-iterator-open-since-ms`. These metrics are available for 
all state stores, including RocksDB,
+in-memory, and custom stores. More details can be found in
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks;>KIP-989.
+
+
+
+To facilitate the implementation of dead-letter queues in Kafka 
Streams,
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception;>KIP-1036

Review Comment:
   This is a consumer KIP -- I don't think we should include it in the KS 
sections.
   
   However, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores
 is missing.



##
docs/streams/upgrade-guide.html:
##
@@ -134,11 +134,44 @@ <
 
 
 Streams API changes in 3.8.0
+
+
+Kafka Streams now supports customizable task assignment strategies via 
the `task.assignor.class` configuration.
+The configuration can be set to the fully qualified class name of a 
custom task assignor implementation
+that has to extend the new 
`org.apache.kafka.streams.processor.assignment.TaskAssignor` interface.
+
+The new configuration also allows users to bring back the behavior of 
the old task assignor
+`StickyTaskAssignor` that was used before the introduction 
`HighAvailabilityTaskAssignor`.
+If no custom task assignor is configured, the default task assignor 
`HighAvailabilityTaskAssignor` is used.
+
+This change also removes the internal config 
`internal.task.assignor.class` that was used for the same
+purpose. If you were using this config, you should switch to using 
`task.assignor.class` instead.
+
+For more details, see the public interface section of
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams;>KIP-924.
+
+
 
 The Processor API now support so-called read-only state stores, added 
via 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores;>KIP-813.
-   These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
-   simlar to KTables with source-topic optimization enabled.
+These stores don't have a dedicated changelog topic, but use their 
source topic for fault-tolerance,
+similar to KTables with source-topic optimization enabled.
+
+
+
+To improve detection of leaked state store iterators, we added new 
store-level metrics to track the number and
+age of open iterators. The new metrics are `num-open-iterators`, 
`iterator-duration-avg`, `iterator-duration-max`
+and `oldest-iterator-open-since-ms`. These metrics are available 

Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-06-10 Thread via GitHub


ganesh-sadanala commented on PR #16080:
URL: https://github.com/apache/kafka/pull/16080#issuecomment-2159220165

   @gharris1727 Thank you! I totally understand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15630) Improve documentation of offset.lag.max

2024-06-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-15630:
---

Assignee: (was: Ganesh Sadanala)

> Improve documentation of offset.lag.max
> ---
>
> Key: KAFKA-15630
> URL: https://issues.apache.org/jira/browse/KAFKA-15630
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>  Labels: newbie
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> It would be good to expand on the role of this configuration on offset 
> translation and mention that it can be set to a smaller value, or even 0, to 
> help in scenarios when records may not flow constantly.
> The documentation string is here: 
> [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-06-10 Thread via GitHub


gharris1727 commented on PR #16080:
URL: https://github.com/apache/kafka/pull/16080#issuecomment-2159216114

   Hi @ganesh-sadanala Unfortunately I don't think we can move forward with 
this PR.
   
   You've linked and mis-represented two pieces of documentation that are not 
Apache-2.0 license compatible, can't be used to write this documentation. See 
the Cloudera Terms and Conditions: 
https://www.cloudera.com/legal/terms-and-conditions.html which is linked from 
the documentation you referenced.
   
   Please understand that we must take license compatibility very seriously, 
and can't accept contributions that risk mis-licensing others' intellectual 
property. When you are contributing to Kafka or other open-source projects in 
the future, it is best that the contributions you propose are totally your own 
work, derived from other open-source works. This includes avoiding the use of 
generative tools that may be trained on incompatibly-licensed works.
   
   I'm closing this PR, and letting another contributor pick this up instead. 
Thank you for your time, and I hope you can continue contributing to Kafka in 
the future.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-06-10 Thread via GitHub


gharris1727 closed pull request #16080: KAFKA-15630 Improve documentation of 
offset.lag.max
URL: https://github.com/apache/kafka/pull/16080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-10 Thread via GitHub


brenden20 commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1633783863


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +124,50 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+CommitRequestManager commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+Set requestedPartitions = new HashSet<>();
+TopicPartition topicPartition1 = new TopicPartition("topic-1", 1);
+requestedPartitions.add(topicPartition1);
+
+CommitRequestManager.OffsetFetchRequestState offsetFetchRequestState = 
commitRequestManager.new OffsetFetchRequestState(
+requestedPartitions,
+retryBackoffMs,
+retryBackoffMaxMs,
+1000,
+memberInfo);
+
+RequestState requestState = new RequestState(
+logContext,
+"CommitRequestManager",
+retryBackoffMs,
+retryBackoffMaxMs);
+
+String target = requestState.toStringBase() +
+", memberInfo=" + memberInfo +
+", expirationTimeMs=" + 
(offsetFetchRequestState.expirationTimeMs().isPresent() ? 
offsetFetchRequestState.expirationTimeMs() : "undefined") +

Review Comment:
   Looking at the code, I am able to do this, however it will cause an issue 
with OffsetFetchRequestState.toStringBase(). ```expirationTimeMs()``` is needed 
to implement the ```toStringBase()``` method, since 
```OffsetFetchRequestState``` cannot access 
```RetriableRequestState.expirationTimeMs``` since it is a private field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-06-10 Thread via GitHub


gharris1727 commented on code in PR #16053:
URL: https://github.com/apache/kafka/pull/16053#discussion_r1633644553


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+
+import java.util.Map;
+
+/**
+ * Wrapper class for a connector configuration that has been used to generate 
task configurations
+ * Supports lazy {@link WorkerConfigTransformer#transform(Map) transformation}.
+ */
+public class AppliedConnectorConfig {
+
+private final Map rawConfig;
+private volatile Map transformedConfig;
+
+/**
+ * Create a new applied config that has not yet undergone
+ * {@link WorkerConfigTransformer#transform(Map) transformation}.
+ * @param rawConfig the non-transformed connector configuration; may be 
null
+ */
+public AppliedConnectorConfig(Map rawConfig) {
+this.rawConfig = rawConfig;
+}
+
+/**
+ * If necessary, {@link WorkerConfigTransformer#transform(Map) transform} 
the raw
+ * connector config, then return the result. Transformed configurations 
are cached and
+ * returned in all subsequent calls.
+ * @param configTransformer the transformer to use, if no transformed 
connector
+ *  config has been cached yet; may be null
+ * @return the possibly-cached, transformed, connector config; may be null
+ */
+public Map transformedConfig(WorkerConfigTransformer 
configTransformer) {
+if (transformedConfig != null || rawConfig == null)
+return transformedConfig;

Review Comment:
   very optional nit: If you're doing double checked locking, it makes sense to 
optimize for the fewest volatile reads, right? :)
   ```suggestion
   if (rawConfig == null)
   return null;
   Map transformed = transformedConfig;
   if (transformed != null)
   return transformed;
   ```
   
   I would have been lazy and just marked the whole method as synchronized, but 
I'm fine with any thread-safe and correct implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15743: Fix the directory UUIDs in ReplicationQuotasTest [kafka]

2024-06-10 Thread via GitHub


wernerdv commented on PR #16077:
URL: https://github.com/apache/kafka/pull/16077#issuecomment-2159017044

   @soarez @pprovenzano @chia7712 Hello, please, take a look this small fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633660092


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -711,8 +752,16 @@ private void flushCurrentBatch() {
 coordinator.updateLastWrittenOffset(offset);
 
 if (offset != currentBatch.nextOffset) {
-throw new IllegalStateException("The state machine of 
coordinator " + tp + " is out of sync with the " +
-"underlying log. The last write returned " + 
offset + " while " + currentBatch.nextOffset + " was expected");
+log.error("The state machine of the coordinator {} is 
out of sync with the underlying log. " +
+"The last written offset returned is {} while the 
coordinator expected {}. The coordinator " +
+"will be reloaded in order to re-synchronize the 
state machine.",
+tp, offset, currentBatch.nextOffset);
+// Transition to FAILED state to unload the state 
machine and complete
+// exceptionally all the pending operations.
+transitionTo(CoordinatorState.FAILED);
+// Transition to LOADING to trigger the restoration of 
the state.
+transitionTo(CoordinatorState.LOADING);

Review Comment:
   Is it worth adding a test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on PR #16215:
URL: https://github.com/apache/kafka/pull/16215#issuecomment-2159004801

   @dajac makes sense. I guess I just didn't know if there was a case where we 
had one in a batch so it wasn't written to the log yet, but we tried to commit 
offsets. 
   
   Thanks for clarifying.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Merge trunk [kafka]

2024-06-10 Thread via GitHub


mjsax closed pull request #16271: Merge trunk
URL: https://github.com/apache/kafka/pull/16271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Merge trunk [kafka]

2024-06-10 Thread via GitHub


mjsax opened a new pull request, #16271:
URL: https://github.com/apache/kafka/pull/16271

   Resolved merge conflict in `gradle/dependencies.gradle`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH [kafka]

2024-06-10 Thread via GitHub


brandboat commented on code in PR #16231:
URL: https://github.com/apache/kafka/pull/16231#discussion_r1633639458


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Threads(2)
+public class ConcurrentMapBenchmark {
+private static final int TIMES = 1000_000;
+
+@Param({"100"})
+private int mapSize;
+
+@Param({"0.1"})
+private double writePercentage;
+
+private Map concurrentHashMap;
+private Map copyOnWriteMap;
+private int writePerLoops;
+
+@Setup
+public void setup() {
+Map mapTemplate = IntStream.range(0, mapSize).boxed()
+.collect(Collectors.toMap(i -> i, i -> i));
+concurrentHashMap = new ConcurrentHashMap<>(mapTemplate);
+copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate);
+writePerLoops = TIMES / (int) Math.round(writePercentage * TIMES);
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(concurrentHashMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testConcurrentHashMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+blackhole.consume(copyOnWriteMap.get(i % mapSize));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testCopyOnWriteMapGetRandom(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
+} else {
+
blackhole.consume(copyOnWriteMap.get(ThreadLocalRandom.current().nextInt(0, 
mapSize + 1)));
+}
+}
+}
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+  

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


dajac commented on PR #16215:
URL: https://github.com/apache/kafka/pull/16215#issuecomment-2158967828

   @jolshan Committing offsets above the last written offset is not allowed so 
it is probably not necessary. We already have tests to validate the logic to 
commit offsets up to the last written offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16928) Test all of the request and response methods in RaftUtil

2024-06-10 Thread Jira
José Armando García Sancio created KAFKA-16928:
--

 Summary: Test all of the request and response methods in RaftUtil
 Key: KAFKA-16928
 URL: https://issues.apache.org/jira/browse/KAFKA-16928
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio


Add a RaftUtilTest test suite that checks that the request and response 
constructed by RaftUtil can be serialized to all of the support KRaft RPC 
versions.

The RPCs are:
 # Fetch
 # FetchSnapshot
 # Vote
 # BeginQuorumEpoch
 # EndQuorumEpoch

At the moment some of the RPCs are missing this should be worked after RaftUtil 
implements the creation of all KRaft RPCs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16914: Added share group dynamic and broker configs [kafka]

2024-06-10 Thread via GitHub


adixitconfluent commented on code in PR #16268:
URL: https://github.com/apache/kafka/pull/16268#discussion_r1633612948


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1455,6 +1487,30 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} 
must be less than or equals " +
   s"to 
${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
 
+require(shareGroupMaxHeartbeatIntervalMs >= 
shareGroupMinHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must 
be greater than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
+require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be 
greater than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
+require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be 
less than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
+
+require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be 
greater than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
+require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be 
greater than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
+require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be 
less than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
+
+require(shareGroupMaxRecordLockDurationMs >= 
shareGroupRecordLockDurationMs,

Review Comment:
   sure, I'll add it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on PR #16215:
URL: https://github.com/apache/kafka/pull/16215#issuecomment-2158928929

   Looks pretty good. I was wondering -- do we want to test the behavior of 
when we commit before a batch is flushed? (Is this possible)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-10 Thread via GitHub


rajinisivaram commented on PR #13277:
URL: https://github.com/apache/kafka/pull/13277#issuecomment-2158922004

   @ivanyu Also need to fix unit tests that are failing because of the change 
from Node to LeastLoadedNode and resolve merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-10 Thread via GitHub


rajinisivaram commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1633579132


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1122,13 +1141,26 @@ public long maybeUpdate(long now) {
 
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
-Node node = leastLoadedNode(now);
-if (node == null) {
+LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (leastLoadedNode.node() == null
+&& !leastLoadedNode.isAtLeastOneConnected()

Review Comment:
   Could we add a method to `LeastLoadedNode` that checks 
`leastLoadedNode.node() == null && !leastLoadedNode.isAtLeastOneConnected()` 
since we use that here and in Admin client? It will make it more obvious in 
LeastLoadedNode that the boolean is only used when node is null. We could then 
remove `isAtLeastOneConnected` method perhaps.



##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -75,6 +75,10 @@ public class ServerConfigs {
 public static final long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS = 
CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS;
 public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC;
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+public static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;

Review Comment:
   These are no longer required?



##
clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+private final Node node;
+private final boolean atLeastOneConnected;
+
+public LeastLoadedNode(Node node, boolean atLeastOneConnected) {
+this.node = node;
+this.atLeastOneConnected = atLeastOneConnected;
+}
+
+public Node node() {
+return node;
+}
+
+public boolean isAtLeastOneConnected() {

Review Comment:
   We are setting this to true only for ready connections, should we rename the 
method to indicate that?



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -705,16 +715,25 @@ public Node leastLoadedNode(long now) {
 Node foundCanConnect = null;
 Node foundReady = null;
 
+boolean atLeastOneNodeConnected = false;
+
 int offset = this.randOffset.nextInt(nodes.size());
 for (int i = 0; i < nodes.size(); i++) {
 int idx = (offset + i) % nodes.size();
 Node node = nodes.get(idx);
+
+if (!atLeastOneNodeConnected
+&& connectionStates.isReady(node.idString(), now)
+&& selector.isChannelReady(node.idString())) {
+atLeastOneNodeConnected = true;

Review Comment:
   Can we add one unit test for the various combinations to verify the returned 
values for LeastLoadedNode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633578691


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+   

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1631442094


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+   

Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633578691


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,330 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Writes the current (or pending) batch to the log. When the batch is 
written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void writeCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+// Add all the pending deferred events to the deferred 
event queue.
+for (DeferredEvent event : currentBatch.events) {
+deferredEventQueue.add(offset, event);
+}
+
+// Free up the current batch.
+freeCurrentBatch();
+} catch (Throwable t) {
+failCurrentBatch(t);
+}
+}
+}
+
+/**
+ * Writes the current batch if it is transactional or if it has past 
the append linger time.
+ */
+private void maybeWriteCurrentBatch(long currentTimeMs) {
+if (currentBatch != null) {
+if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) {
+writeCurrentBatch();
+}
+}
+}
+
+/**
+ * Fails the current batch, reverts to the snapshot to the base/start 
offset of the
+ * batch, fails all the associated events.
+ */
+private void failCurrentBatch(Throwable t) {
+if (currentBatch != null) {
+coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
+for (DeferredEvent event : currentBatch.events) {
+event.complete(t);
+}
+freeCurrentBatch();
+}
+}
+
+/**
+ * Allocates a new batch if none already exists.
+ */
+private void maybeAllocateNewBatch(
+long producerId,
+short producerEpoch,
+VerificationGuard verificationGuard,
+long currentTimeMs
+) {
+if (currentBatch == null) {
+LogConfig logConfig = partitionWriter.config(tp);
+byte magic = logConfig.recordVersion().value;
+int maxBatchSize = logConfig.maxMessageSize();
+long prevLastWrittenOffset = coordinator.lastWrittenOffset();
+ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
+
+MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+buffer,
+magic,
+compression,
+TimestampType.CREATE_TIME,
+0L,
+currentTimeMs,
+producerId,
+producerEpoch,
+0,
+producerId != RecordBatch.NO_PRODUCER_ID,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH,
+maxBatchSize
+);
+
+Optional lingerTimeoutTask = Optional.empty();
+if (appendLingerMs > 0) {
+lingerTimeoutTask = Optional.of(new 
TimerTask(appendLingerMs) {
+@Override
+public void run() {
+scheduleInternalOperation("FlushBatch", tp, () -> {
+if (this.isCancelled()) return;
+withActiveContextOrThrow(tp, 
CoordinatorContext::writeCurrentBatch);
+   

[jira] [Created] (KAFKA-16927) Adding tests for restarting followers receiving leader endpoint correctly

2024-06-10 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16927:


 Summary: Adding tests for restarting followers receiving leader 
endpoint correctly
 Key: KAFKA-16927
 URL: https://issues.apache.org/jira/browse/KAFKA-16927
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Alyssa Huang


We'll need to test that restarting followers are populated with correct leader 
endpoint after receiving BeginQuorumEpochRequest. Depends on KAFKA-16536 and 
voter RPC changes to be done



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16926) Optimize BeginQuorumEpoch heartbeat

2024-06-10 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang updated KAFKA-16926:
-
Component/s: kraft

> Optimize BeginQuorumEpoch heartbeat
> ---
>
> Key: KAFKA-16926
> URL: https://issues.apache.org/jira/browse/KAFKA-16926
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Alyssa Huang
>Priority: Minor
>
> Instead of sending out BeginQuorum requests to every voter on a cadence, we 
> can save on some requests by only sending to those which have not fetched 
> within the fetch timeout.
> Split from KAFKA-16536



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16926) Optimize BeginQuorumEpoch heartbeat

2024-06-10 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang updated KAFKA-16926:
-
Description: 
Instead of sending out BeginQuorum requests to every voter on a cadence, we can 
save on some requests by only sending to those which have not fetched within 
the fetch timeout.

Split from KAFKA-16536

  was:Instead of sending out BeginQuorum requests to every voter on a cadence, 
we can save on some requests by only sending to those which have not fetched 
within the fetch timeout.


> Optimize BeginQuorumEpoch heartbeat
> ---
>
> Key: KAFKA-16926
> URL: https://issues.apache.org/jira/browse/KAFKA-16926
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alyssa Huang
>Priority: Minor
>
> Instead of sending out BeginQuorum requests to every voter on a cadence, we 
> can save on some requests by only sending to those which have not fetched 
> within the fetch timeout.
> Split from KAFKA-16536



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16926) Optimize BeginQuorumEpoch heartbeat

2024-06-10 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16926:


 Summary: Optimize BeginQuorumEpoch heartbeat
 Key: KAFKA-16926
 URL: https://issues.apache.org/jira/browse/KAFKA-16926
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alyssa Huang


Instead of sending out BeginQuorum requests to every voter on a cadence, we can 
save on some requests by only sending to those which have not fetched within 
the fetch timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16914: Added share group dynamic and broker configs [kafka]

2024-06-10 Thread via GitHub


omkreddy commented on code in PR #16268:
URL: https://github.com/apache/kafka/pull/16268#discussion_r1633540628


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1455,6 +1487,30 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   s"${GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG} 
must be less than or equals " +
   s"to 
${GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
 
+require(shareGroupMaxHeartbeatIntervalMs >= 
shareGroupMinHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG} must 
be greater than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
+require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be 
greater than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG}")
+require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG} must be 
less than or equals " +
+s"to 
${ShareGroupConfigs.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG}")
+
+require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG} must be 
greater than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
+require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be 
greater than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG}")
+require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
+  s"${ShareGroupConfigs.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG} must be 
less than or equals " +
+s"to ${ShareGroupConfigs.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG}")
+
+require(shareGroupMaxRecordLockDurationMs >= 
shareGroupRecordLockDurationMs,

Review Comment:
   Dont we need to add check for 
`SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG`  similar to above checks for 
`shareGroupMaxSessionTimeoutMs` 



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -292,6 +292,23 @@ object KafkaConfig {
   .define(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_DOC)
   .define(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
STRING, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, 
CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]):
 _*), MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_DOC)
 
+  /** Share Group Configurations **/
+  // Internal configuration used by integration and system tests.
+  .defineInternal(ShareGroupConfigs.SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, 
ShareGroupConfigs.SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, 
ShareGroupConfigs.SHARE_GROUP_ENABLE_DOC)
+  .define(ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), 
MEDIUM, ShareGroupConfigs.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
+  .define(ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 
INT, ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, 
between(1000, 6), MEDIUM, 
ShareGroupConfigs.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)

Review Comment:
`between(1000, 6)` -This should be `atleast(` validator and add require 
check below validation checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16922 : Adding unit tests for NewTopic [kafka]

2024-06-10 Thread via GitHub


muralibasani commented on PR #16255:
URL: https://github.com/apache/kafka/pull/16255#issuecomment-2158838080

   @chia7712 looks like the failed tests in the ci are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16770; [2/2] Coalesce records into bigger batches [kafka]

2024-06-10 Thread via GitHub


jolshan commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633532863


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -583,11 +674,339 @@ private void unload() {
 }
 timer.cancelAll();
 deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
+failCurrentBatch(Errors.NOT_COORDINATOR.exception());
 if (coordinator != null) {
 coordinator.onUnloaded();
 }
 coordinator = null;
 }
+
+/**
+ * Frees the current batch.
+ */
+private void freeCurrentBatch() {
+// Cancel the linger timeout.
+currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
+
+// Release the buffer.
+bufferSupplier.release(currentBatch.buffer);
+
+currentBatch = null;
+}
+
+/**
+ * Flushes the current (or pending) batch to the log. When the batch 
is written
+ * locally, a new snapshot is created in the snapshot registry and the 
events
+ * associated with the batch are added to the deferred event queue.
+ */
+private void flushCurrentBatch() {
+if (currentBatch != null) {
+try {
+// Write the records to the log and update the last 
written offset.
+long offset = partitionWriter.append(
+tp,
+currentBatch.verificationGuard,
+currentBatch.builder.build()
+);
+coordinator.updateLastWrittenOffset(offset);
+
+if (offset != currentBatch.nextOffset) {
+throw new IllegalStateException("The state machine of 
coordinator " + tp + " is out of sync with the " +
+"underlying log. The last write returned " + 
offset + " while " + currentBatch.nextOffset + " was expected");

Review Comment:
   This could be a better approach. I was not aware there were mechanisms to 
validate before writing.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16894: Define group.version=2 [kafka]

2024-06-10 Thread via GitHub


AndrewJSchofield commented on PR #16212:
URL: https://github.com/apache/kafka/pull/16212#issuecomment-2158830511

   @omkreddy We need KIP-1014 (under discussion) to do this. I'll leave this PR 
in draft state for now. We need to find another way to turn KIP-932 on and off 
for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


lucasbru commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1633531270


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   Yeah, I saw that as well, but given that we are so close to code freeze, I 
let it slide :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


lucasbru merged PR #16043:
URL: https://github.com/apache/kafka/pull/16043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


kirktrue commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1633528798


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   Super nitpick: avoid whitespace changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >