[GitHub] [kafka] fvaleri opened a new pull request, #13784: [MINOR] Fix for MetadataQuorumCommandErrorTest.testRelativeTimeMs

2023-05-30 Thread via GitHub


fvaleri opened a new pull request, #13784:
URL: https://github.com/apache/kafka/pull/13784

   This change should fix the following error on CI/CD.
   
   ```sh
   org.apache.kafka.common.KafkaException: Error while computing relative time, 
possible drift in system clock.
   Current timestamp is 1685472793724, test timestamp is 1685472793724
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output

2023-05-30 Thread via GitHub


dajac commented on PR #13738:
URL: https://github.com/apache/kafka/pull/13738#issuecomment-1569434420

   @fvaleri testRelativeTimeMs seems to fail regularly. There is an example 
here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13782/1/tests.
 Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] alok123t commented on a diff in pull request #13772: MINOR: Add helper util `Snapshots.lastContainedLogTimestamp`

2023-05-30 Thread via GitHub


alok123t commented on code in PR #13772:
URL: https://github.com/apache/kafka/pull/13772#discussion_r1210872444


##
raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java:
##
@@ -151,4 +155,24 @@ public static Path markForDelete(Path logDir, 
OffsetAndEpoch snapshotId) {
 );
 }
 }
+
+public static long lastContainedLogTimestamp(RawSnapshotReader reader) {
+try (RecordsSnapshotReader recordsSnapshotReader =
+ RecordsSnapshotReader.of(
+ reader,
+ new IdentitySerde(),

Review Comment:
   done in 
https://github.com/apache/kafka/pull/13772/commits/b10d32c85c9047b15d355239c44da62d9921bb6d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-30 Thread via GitHub


jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1569176269

   test failure
   `testSendOffsetsToTransactionTimeout(String).quorum=zk – 
kafka.api.TransactionsTest`
   
   ```
   org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
3000ms while awaiting InitProducerId
   ```
   which looks related to the changes. but the test passed locally several 
times.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210858865


##
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.TimeoutException;
+
+import java.util.concurrent.ExecutionException;
+
+
+public class ControllerExceptions {
+/**
+ * Check if an exception is a normal timeout exception.
+ *
+ * @param exception The exception to check.
+ * @return  True if the exception is a normal timeout 
exception and NOT a timeout
+ *  exception generated by the controller shutting 
down.
+ */
+public static boolean isNormalTimeoutException(Throwable exception) {
+if (exception == null) return false;
+if (exception instanceof ExecutionException) {
+exception = exception.getCause();
+if (exception == null) return false;
+}
+if (!(exception instanceof TimeoutException)) return false;
+if (exception.getMessage() != null &&
+exception.getMessage().equals("The controller is shutting 
down.")) {
+return false;
+}
+return true;

Review Comment:
   I removed the check for the exception text. (It wasn't actually doing 
anything here in any case!) This is something I'll revisit in a follow-on PR 
(there are a few corner cases around error handling and queue shutdown to fix 
up)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210858865


##
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.TimeoutException;
+
+import java.util.concurrent.ExecutionException;
+
+
+public class ControllerExceptions {
+/**
+ * Check if an exception is a normal timeout exception.
+ *
+ * @param exception The exception to check.
+ * @return  True if the exception is a normal timeout 
exception and NOT a timeout
+ *  exception generated by the controller shutting 
down.
+ */
+public static boolean isNormalTimeoutException(Throwable exception) {
+if (exception == null) return false;
+if (exception instanceof ExecutionException) {
+exception = exception.getCause();
+if (exception == null) return false;
+}
+if (!(exception instanceof TimeoutException)) return false;
+if (exception.getMessage() != null &&
+exception.getMessage().equals("The controller is shutting 
down.")) {
+return false;
+}
+return true;

Review Comment:
   I removed the check for the exception text. This is something I'll revisit 
in a follow-on PR (there are a few corner cases around error handling and queue 
shutdown to fix up)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-30 Thread via GitHub


jeffkbkim commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1210852722


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 60)
+public class MultiThreadedEventProcessorTest {
+
+private static class FutureEvent implements CoordinatorEvent {
+private final int key;
+private final CompletableFuture future;
+private final Supplier supplier;
+
+FutureEvent(
+int key,
+Supplier supplier
+) {
+this.key = key;
+this.future = new CompletableFuture<>();
+this.supplier = supplier;
+}
+
+@Override
+public void run() {
+future.complete(supplier.get());
+}
+
+@Override
+public void complete(Throwable ex) {
+future.completeExceptionally(ex);
+}
+
+@Override
+public Integer key() {
+return key;
+}
+
+public CompletableFuture future() {
+return future;
+}
+
+@Override
+public String toString() {
+return "FutureEvent(key=" + key + ")";
+}
+}
+
+@Test
+public void testCreateAndClose() throws Exception {
+CoordinatorEventProcessor eventProcessor = new 
MultiThreadedEventProcessor(
+new LogContext(),
+"event-processor-",
+2
+);
+eventProcessor.close();
+}
+
+@Test
+public void testEventsAreProcessed() throws Exception {

Review Comment:
   does EventAccumulatorTest#testKeyConcurrentProcessingAndOrdering() actually 
test concurrent ordering? i'm wondering if we can add one here or in 
EventAccumulatorTest.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} 

[GitHub] [kafka] rondagostino commented on a diff in pull request #13780: KAFKA-15039: Reduce logging level to trace in PartitionChangeBuilder.…

2023-05-30 Thread via GitHub


rondagostino commented on code in PR #13780:
URL: https://github.com/apache/kafka/pull/13780#discussion_r1210847987


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -220,13 +220,15 @@ private boolean isValidNewLeader(int replica) {
 private void tryElection(PartitionChangeRecord record) {
 ElectionResult electionResult = electLeader();
 if (electionResult.node != partition.leader) {
-log.debug(
-"Setting new leader for topicId {}, partition {} to {} using 
{} election",
-topicId,
-partitionId,
-electionResult.node,
-electionResult.unclean ? "an unclean" : "a clean"
-);
+if (log.isTraceEnabled()) {
+log.trace(
+"Setting new leader for topicId {}, partition {} to {} 
using {} election",
+topicId,
+partitionId,
+electionResult.node,
+electionResult.unclean ? "an unclean" : "a clean"

Review Comment:
   I don't think it is possible to do so since we don't know the result of the 
election anywhere else.  Unclean leader election may be enabled, for example, 
but it may not ever occur.  It is only here that we know.  I added additional 
logic to ensure that we will always log an unclean election at the WARN level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210806096


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1106,6 +1107,34 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testTimedOutHeartbeats(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(3).
+setNumControllerNodes(1).build()).
+  setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
+  setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  val controller = cluster.controllers().values().iterator().next()
+  controller.controller.waitForReadyBrokers(3).get()
+  TestUtils.retry(6) {
+val latch = 
controller.controller.asInstanceOf[QuorumController].pause()
+Thread.sleep(1001)
+latch.countDown()
+
controller.controller.asInstanceOf[QuorumController].pause().countDown()

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210817910


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -566,15 +578,46 @@ public void run() throws Exception {
 ZkMigrationLeadershipState newState = 
migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
 offsetAndEpochAfterMigration.offset(),
 offsetAndEpochAfterMigration.epoch());
-applyMigrationOperation("Finished migrating ZK data", state -> 
zkMigrationClient.setMigrationRecoveryState(newState));
-
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+applyMigrationOperation("Finished migrating ZK data to KRaft", 
state -> zkMigrationClient.setMigrationRecoveryState(newState));
+transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
 } catch (Throwable t) {
 zkRecordConsumer.abortMigration();
 super.handleException(t);
 }
 }
 }
 
+static KRaftMigrationOperationConsumer countingOperationConsumer(
+Map dualWriteCounts,
+BiConsumer operationConsumer
+) {
+return (opType, logMsg, operation) -> {
+dualWriteCounts.compute(opType, (key, value) -> {
+if (value == null) {
+return 1;
+} else {
+return value + 1;
+}
+});
+operationConsumer.accept(logMsg, operation);
+};
+}
+
+
+class SyncKRaftMetadataEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
+log.info("Performing a full metadata sync from KRaft to ZK.");
+Map dualWriteCounts = new HashMap<>();
+zkMetadataWriter.handleSnapshot(image, 
countingOperationConsumer(
+dualWriteCounts, 
KRaftMigrationDriver.this::applyMigrationOperation));
+log.info("Made the following ZK writes when reconciling with 
KRaft state: {}", dualWriteCounts);

Review Comment:
   hmm, this will be printed out unsorted, right? I guess this is a nitpick but 
it would be better to sort



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13783: MINOR: Remove spurious warning about plugin.path config provider usage when null

2023-05-30 Thread via GitHub


gharris1727 opened a new pull request, #13783:
URL: https://github.com/apache/kafka/pull/13783

   If there is no plugin.path specified, this warning prints, and says `null` 
doesn't match `null`.
   This is misleading, and is caused by `rawPluginPath` being `null` and 
`transformedPluginPath` being `"null"`.
   Change the Objects.toString to pass-through the null, so that the condition 
doesn't fire.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210814541


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -566,15 +578,46 @@ public void run() throws Exception {
 ZkMigrationLeadershipState newState = 
migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
 offsetAndEpochAfterMigration.offset(),
 offsetAndEpochAfterMigration.epoch());
-applyMigrationOperation("Finished migrating ZK data", state -> 
zkMigrationClient.setMigrationRecoveryState(newState));
-
transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
+applyMigrationOperation("Finished migrating ZK data to KRaft", 
state -> zkMigrationClient.setMigrationRecoveryState(newState));

Review Comment:
   Maybe add a comment here about how we always go through the sync kraft -> zk 
state here, even immediately after we've loaded from zk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210813001


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -149,10 +152,9 @@ CompletableFuture migrationState() {
 }
 
 private void recoverMigrationStateFromZK() {
-log.info("Recovering migration state from ZK");
-applyMigrationOperation("Recovery", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
 String maybeDone = migrationLeadershipState.zkMigrationComplete() ? 
"done" : "not done";
-log.info("Recovered migration state {}. ZK migration is {}.", 
migrationLeadershipState, maybeDone);
+log.info("ZK migration is {}.", maybeDone);

Review Comment:
   I'd really prefer to say something like "Initial ZK load is done" / "Initial 
ZK load is not done"
   
   We should also change `ZkMigrationLeadershipState.zkMigrationComplete` -> 
`ZkMigrationLeadershipState.initialZkLoadComplete`
   
   After all, the whole process here is technically "ZK migration" not just the 
initial load



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1210810667


##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -303,6 +304,14 @@ class ZkMigrationClient(
 new 
util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
   }
 
+  override def readProducerId(): util.Optional[java.lang.Long] = {

Review Comment:
   This doesn't seem to be returning nextProducerId, it's returning the current 
producer id.
   
   Why not just have `writeProducerId` take a `ProducerIdsBlock` object so we 
don't have to do a bunch of dubious translation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13770: MINOR: Add transaction verification config to producerStateManager config

2023-05-30 Thread via GitHub


jolshan merged PR #13770:
URL: https://github.com/apache/kafka/pull/13770


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210806096


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1106,6 +1107,34 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testTimedOutHeartbeats(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(3).
+setNumControllerNodes(1).build()).
+  setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
+  setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  val controller = cluster.controllers().values().iterator().next()
+  controller.controller.waitForReadyBrokers(3).get()
+  TestUtils.retry(6) {
+val latch = 
controller.controller.asInstanceOf[QuorumController].pause()
+Thread.sleep(1001)
+latch.countDown()
+
controller.controller.asInstanceOf[QuorumController].pause().countDown()

Review Comment:
   I'll add a better explanation for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13770: MINOR: Add transaction verification config to producerStateManager config

2023-05-30 Thread via GitHub


jolshan commented on PR #13770:
URL: https://github.com/apache/kafka/pull/13770#issuecomment-1569075902

   Checked the build -- streams upgrade tests had some issue that failed jdk 17 
build.
   There were also two test failures -- 
   
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.tools.MetadataQuorumCommandErrorTest.testRelativeTimeMs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13770/3/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandErrorTest/Build___JDK_8_and_Scala_2_12___testRelativeTimeMs__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] 
true](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13770/3/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/Build___JDK_11_and_Scala_2_131__true/)
   
   Given the nature of this change -- basically a no-op except for some test 
files that all passed. I will go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210720723


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1106,6 +1107,34 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testTimedOutHeartbeats(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(3).
+setNumControllerNodes(1).build()).
+  setConfigProp(KafkaConfig.BrokerHeartbeatIntervalMsProp, 10.toString).
+  setConfigProp(KafkaConfig.BrokerSessionTimeoutMsProp, 1000.toString).
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  val controller = cluster.controllers().values().iterator().next()
+  controller.controller.waitForReadyBrokers(3).get()
+  TestUtils.retry(6) {
+val latch = 
controller.controller.asInstanceOf[QuorumController].pause()
+Thread.sleep(1001)
+latch.countDown()
+
controller.controller.asInstanceOf[QuorumController].pause().countDown()

Review Comment:
   What is the purpose of this line?  Seems it is an unnecessary extra 
pause/resume, and the test passes without it.  Can we remove it?



##
metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.TimeoutException;
+
+import java.util.concurrent.ExecutionException;
+
+
+public class ControllerExceptions {
+/**
+ * Check if an exception is a normal timeout exception.
+ *
+ * @param exception The exception to check.
+ * @return  True if the exception is a normal timeout 
exception and NOT a timeout
+ *  exception generated by the controller shutting 
down.
+ */
+public static boolean isNormalTimeoutException(Throwable exception) {
+if (exception == null) return false;
+if (exception instanceof ExecutionException) {
+exception = exception.getCause();
+if (exception == null) return false;
+}
+if (!(exception instanceof TimeoutException)) return false;
+if (exception.getMessage() != null &&
+exception.getMessage().equals("The controller is shutting 
down.")) {
+return false;
+}
+return true;

Review Comment:
   This seems brittle.  Is there a way we can identify the specific thing we 
are interested in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-05-30 Thread via GitHub


Hangleton commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1568955091

   Many thanks for the patch and the collected data! Really interesting to see 
the impact of this change. A few questions: 
   
   - What storage device and file system are used in the test?
   - Would you have a real-life workload where the impact of this change can be 
quantified? The workload generated by the producer-perf-test.sh exhibits the 
problem the most because the segments of all replicas on the brokers start 
rolling at the same time. Which is why it is also interesting to assess the 
impact using topic-partitions which have different ingress rate and/or use 
segments of different sizes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-05-30 Thread Ruslan Scherbakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727662#comment-17727662
 ] 

Ruslan Scherbakov commented on KAFKA-9693:
--

According to comments in PR in the GitHub similar change was done for trunk: 
[https://github.com/apache/kafka/pull/13782]

Note that suggested change somehow replicates how LocalLog is flushed on 
rolling:

https://github.com/apache/kafka/blob/d557f7997359daa5853d446d9093995577540de2/core/src/main/scala/kafka/log/UnifiedLog.scala#L1493

  def roll(expectedNextOffset: Option[Long] = None): LogSegment = lock 
synchronized {
    val newSegment = localLog.roll(expectedNextOffset)
    // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
    // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
    // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
    // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
    // we manually override the state offset here prior to taking the snapshot.
    producerStateManager.updateMapEndOffset(newSegment.baseOffset)
    producerStateManager.takeSnapshot()   <- snapshot is flushed synchronously 
<- change is similar scheduler.scheduleOnce(...)
    updateHighWatermarkWithLogEndOffset()
    // Schedule an asynchronous flush of the old segment
    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))  <- LocalLog is flushed 
asynchronously
    newSegment
  }

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> URL: https://issues.apache.org/jira/browse/KAFKA-9693
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: OS: Amazon Linux 2
> Kafka version: 2.2.1
>Reporter: Paolo Moriello
>Assignee: Paolo Moriello
>Priority: Major
>  Labels: Performance, latency, performance
> Attachments: image-2020-03-10-13-17-34-618.png, 
> image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, 
> image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, 
> image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, 
> image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, 
> latency_plot2.png
>
>
> h1. Summary
> When a log segment fills up, Kafka rolls over onto a new active segment and 
> force the flush of the old segment to disk. When this happens, log segment 
> _append_ duration increase causing important latency spikes on producer(s) 
> and replica(s). This ticket aims to highlight the problem and propose a 
> simple mitigation: add a new configuration to enable/disable rolled segment 
> flush.
> h1. 1. Phenomenon
> Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to 
> ~50x-200x more than usual. For instance, normally 99th %ile is lower than 
> 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th 
> %iles even jump to 500-700ms.
> Latency spikes happen at constant frequency (depending on the input 
> throughput), for small amounts of time. All the producers experience a 
> latency increase at the same time.
> h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314!
> {{Example of response time plot observed during on a single producer.}}
> URPs rarely appear in correspondence of the latency spikes too. This is 
> harder to reproduce, but from time to time it is possible to see a few 
> partitions going out of sync in correspondence of a spike.
> h1. 2. Experiment
> h2. 2.1 Setup
> Kafka cluster hosted on AWS EC2 instances.
> h4. Cluster
>  * 15 Kafka brokers: (EC2 m5.4xlarge)
>  ** Disk: 1100Gb EBS volumes (4750Mbps)
>  ** Network: 10 Gbps
>  ** CPU: 16 Intel Xeon Platinum 8000
>  ** Memory: 64Gb
>  * 3 Zookeeper nodes: m5.large
>  * 6 producers on 6 EC2 instances in the same region
>  * 1 topic, 90 partitions - replication factor=3
> h4. Broker config
> Relevant configurations:
> {quote}num.io.threads=8
>  num.replica.fetchers=2
>  offsets.topic.replication.factor=3
>  num.network.threads=5
>  num.recovery.threads.per.data.dir=2
>  min.insync.replicas=2
>  num.partitions=1
> {quote}
> h4. Perf Test
>  * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per 
> broker)
>  * record size = 2
>  * Acks = 1, linger.ms = 1, compression.type = none
>  * Test duration: ~20/30min
> h2. 2.2 Analysis
> Our analysis showed an high +correlation between log segment flush count/rate 
> and the latency spikes+. This indicates that the spikes in max latency are 
> related to Kafka behavior on rolling over new segments.
> The other metrics did not show any relevant impact 

[GitHub] [kafka] novosibman commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-30 Thread via GitHub


novosibman commented on PR #13768:
URL: https://github.com/apache/kafka/pull/13768#issuecomment-1568910054

   > We typically make changes to master first. Would you be willing to submit 
a PR for that instead?
   
   Prepared and tested trunk version: https://github.com/apache/kafka/pull/13782


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] novosibman opened a new pull request, #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-05-30 Thread via GitHub


novosibman opened a new pull request, #13782:
URL: https://github.com/apache/kafka/pull/13782

   Trunk version of initial change: https://github.com/apache/kafka/pull/13768 
in branch "3.4"
   
   Key difference with branched change:
   Passed and used existing `scheduler` which already is being used for 
flushing large segment logs and indices.
   
   In all cases snapshot's fileChannel is kept opened when passed to other 
threads for flushing and closing (so removing try-with-resource in this change).
   
   Related issue https://issues.apache.org/jira/browse/KAFKA-9693
   
   The issue with repeating latency spikes during Kafka log segments rolling 
still reproduced on the latest versions including kafka_2.13-3.4.0.
   
   It was found that flushing Kafka snapshot file during segments rolling 
blocks producer request handling thread for some time. Reproduced latency 
improvement in the kafka_2.13-3.6.0-snapshot by offloading flush operation. 
Used available on my side single node test configuration:
kafka_2.13-3.6.0-snapshot - trunk version
kafka_2.13-3.6.0-snapshot-fix - trunk version with provided change
   
   partitions=10 # rolling at each ~52 seconds
   
![image](https://github.com/apache/kafka/assets/6793713/6f71a515-36d2-4d10-a577-6a8712c2dbf0)
   
   partitions=100 # rolling events about each 8.5 minute:
   
![image](https://github.com/apache/kafka/assets/6793713/a7780840-75e2-4fca-b1a6-7fa17cec702c)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-30 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725223#comment-17725223
 ] 

Yash Mayya edited comment on KAFKA-15012 at 5/30/23 6:34 PM:
-

Thanks for filing this Jira [~ranjanrao]. Simply enabling the 
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible 
change since there could potentially be users relying on the existing behavior 
to send bad data (i.e. some numeric field has leading zeroes when it isn't 
expected to) to a DLQ topic. This might need a small KIP adding a new config 
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which 
defaults to false in order to be backward compatible. Alternatively, we could 
design a way to allow users to configure the various 
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
 and 
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonWriteFeature.html]s
 for the JsonSerializer / JsonDeserializer used in the JsonConverter.


was (Author: yash.mayya):
Thanks for filing this Jira [~ranjanrao]. Simply enabling the 
_ALLOWLEADINGZEROSFORNUMBERS_ feature would likely be a backward incompatible 
change since there could potentially be users relying on the existing behavior 
to send bad data (i.e. some numeric field has leading zeroes when it isn't 
expected to) to a DLQ topic. This might need a small KIP adding a new config 
(maybe {_}allow.leading.zeroes.for.numbers{_}) to the JsonConverter which 
defaults to false in order to be backward compatible. Alternatively, we could 
design a way to allow users to configure the various 
[JsonReadFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/json/JsonReadFeature.html]s
 and 
[JsonWriteFeature|https://fasterxml.github.io/jackson-core/javadoc/2.10/com/fasterxml/jackson/core/JsonParser.Feature.html]s
 for the JsonSerializer / JsonDeserializer used in the JsonConverter.

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> 

[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-30 Thread Yash Mayya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727653#comment-17727653
 ] 

Yash Mayya commented on KAFKA-15012:


Thanks [~ChrisEgerton], that's a fair point. I was comparing this to 
https://issues.apache.org/jira/browse/KAFKA-8713 but your point regarding there 
being nothing different about the data given to the sink connectors post 
conversion here makes sense and causes this case to be different from 
[KIP-581|https://cwiki.apache.org/confluence/display/KAFKA/KIP-581%3A+Value+of+optional+null+field+which+has+default+value].
 I agree that this qualifies as a bug in the converter so I think we should go 
ahead with the proposed fix here.

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.core.JsonParseException: Invalid numeric value: Leading 
> zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
> Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid numeric 
> value: Leading zeroes not allowed
>  at [Source: (byte[])"00080153032837"; line: 1, column: 2]
>   at 
> com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
>   at 
> com.fasterxml.jackson.core.base.ParserMinimalBase.reportInvalidNumber(ParserMinimalBase.java:551)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyNoLeadingZeroes(UTF8StreamJsonParser.java:1520)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1372)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:855)
>   at 
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
>   at 
> 

[GitHub] [kafka] cmccabe commented on pull request #13759: KAFKA-15019: Improve handling of broker heartbeat timeouts

2023-05-30 Thread via GitHub


cmccabe commented on PR #13759:
URL: https://github.com/apache/kafka/pull/13759#issuecomment-1568871804

   Hi all,
   
   I've updated the approach in the PR. The new approach simply adds basic 
heartbeat handling in the timeout case. We can't do everything we normally can 
here. Most notably, anything involving heartbeats causing a state change like 
fenced -> unfenced or entering controlled shutdown is off limits. But it's 
sufficient to prevent the congestion collapse scenario, and it's pretty simple.
   
   I will also create a KIP to add a new metric for timed out broker 
heartbeats, since I think that's very worthwhile to do. We can do that in a 
follow-on PR. This is something we'll want to track pretty closely. If 
heartbeats start timing out that indicates a critical performance problem that 
needs to be addressed by the operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-05-30 Thread via GitHub


yashmayya commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1210620016


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java:
##
@@ -94,8 +96,8 @@ public void configure(Map configs) {
 {ConfigName.EXCLUDE, "blacklist"},
 }));
 
-exclude = config.getList(ConfigName.EXCLUDE);
-include = config.getList(ConfigName.INCLUDE);
+exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE));
+include = new HashSet<>(config.getList(ConfigName.INCLUDE));

Review Comment:
   Hm that's an interesting point and I hadn't considered that. However, taking 
a closer look, I don't think there'll be any performance penalty here because 
the `ArrayList::contains` method will involve calls to `String::equals` to 
check whether each value field is present in the exclude / include fields. 
`String::equals` and `String::hashCode` both have linear time complexity in 
terms of the length of the String so there won't be much difference in the 
performance between the `HashSet` version and the `ArrayList` version when 
there's a small number of include / exclude fields. I ran a modified benchmark 
(using a single include field and a single exclude field) and the results were 
fairly similar between the two implementations:
   
   ## Using Strings with length ~10
   
   ```
   With HashSet:
   
   Benchmark  (fieldCount)  
Mode  Cnt   Score  Error  Units
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark   100  
avgt51034.411 ±  138.769  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark  1000  
avgt5   12210.318 ±  119.902  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark 1  
avgt5  125383.027 ± 3884.894  ns/op
   
   
---
   
   With ArrayList:
   
   Benchmark  (fieldCount)  
Mode  Cnt  Score   Error  Units
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark   100  
avgt5919.979 ±15.950  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark  1000  
avgt5   8757.357 ±   114.363  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark 1  
avgt5  96112.550 ± 10639.302  ns/op
   ```
   
   ## Using Strings with length ~1000
   
   ```
   With HashSet:
   
   Benchmark  (fieldCount)  
Mode  Cnt   Score   Error  Units
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark   100  
avgt5 982.217 ±16.189  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark  1000  
avgt5   12327.410 ±   657.447  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark 1  
avgt5  162847.616 ± 25125.797  ns/op
   
   
---
   
   With ArrayList:
   
   Benchmark  (fieldCount)  
Mode  Cnt   Score   Error  Units
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark   100  
avgt5   16189.095 ±   741.592  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark  1000  
avgt5   24110.247 ±   180.101  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark 1  
avgt5  165554.095 ± 17875.657  ns/op
   ```
   
   Even on attempting to make use of String interning to speed up the equals 
checks, the results were very similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

2023-05-30 Thread via GitHub


cadonna commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1210581130


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3137,13 +3102,11 @@ public Set changelogPartitions() {
 }
 };
 
-expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-expectLastCall();
-activeTaskCreator.closeThreadProducerIfNeeded();
-expectLastCall().andThrow(new RuntimeException("whatever"));
+when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
 
expect(standbyTaskCreator.createTasks(eq(emptyMap(.andStubReturn(emptyList());
-replay(activeTaskCreator, standbyTaskCreator);
+
expect(standbyTaskCreator.createTasks(eq(emptyMap(.andStubReturn(emptyList());

Review Comment:
   I agree with @clolov .



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
 @Test
 public void shouldCommitViaProducerIfEosAlphaEnabled() {
 final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-.andReturn(producer)
-.andReturn(producer);
+when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+.thenReturn(producer)
+.thenReturn(producer);

Review Comment:
   I agree



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4448,10 +4396,10 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
 final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
 final StreamsProducer producer = mock(StreamsProducer.class);
-
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-.andReturn(producer)
-.andReturn(producer)
-.andReturn(producer);
+when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+.thenReturn(producer)
+.thenReturn(producer)

Review Comment:
   I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-30 Thread via GitHub


cmccabe commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210644585


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   I'll update it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-05-30 Thread via GitHub


yashmayya commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1210628638


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java:
##
@@ -94,8 +96,8 @@ public void configure(Map configs) {
 {ConfigName.EXCLUDE, "blacklist"},
 }));
 
-exclude = config.getList(ConfigName.EXCLUDE);
-include = config.getList(ConfigName.INCLUDE);
+exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE));
+include = new HashSet<>(config.getList(ConfigName.INCLUDE));

Review Comment:
   ~~Only the first call to `String::hashCode` for a string object actually 
does the hash code computation which is cached and subsequent calls just fetch 
the cached value.~~
   
   Edit: The concern was with computing the hash code for value fields so the 
caching isn't relevant here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13666: KAFKA-14462; [13/N] CoordinatorEvent and CoordinatorEventProcessor

2023-05-30 Thread via GitHub


jolshan commented on code in PR #13666:
URL: https://github.com/apache/kafka/pull/13666#discussion_r1210611398


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEventProcessor.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * A {{@link CoordinatorEvent}} processor.
+ */
+public interface CoordinatorEventProcessor extends AutoCloseable {
+/**
+ * Enqueues a new {{@link CoordinatorEvent}}.
+ *
+ * @param event The event.
+ * @throws RejectedExecutionException If the event processor. is closed.

Review Comment:
   nit: extra period



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A multithreaded {{@link CoordinatorEvent}} processor which uses a {{@link 
EventAccumulator}}
+ * which guarantees that events sharing a partition key are not processed 
concurrently.
+ */
+public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+
+/**
+ * The logger.
+ */
+private final Logger log;
+
+/**
+ * The accumulator.
+ */
+private final EventAccumulator accumulator;
+
+/**
+ * The processing threads.
+ */
+private final List threads;
+
+/**
+ * The lock for protecting access to the resources.
+ */
+private final ReentrantLock lock;
+
+/**
+ * A boolean indicated whether the event processor is shutting down.
+ */
+private volatile boolean shuttingDown;
+
+/**
+ * Constructor.
+ *
+ * @param logContextThe log context.
+ * @param threadPrefix  The thread prefix.
+ * @param numThreadsThe number of threads.
+ */
+public MultiThreadedEventProcessor(
+LogContext logContext,
+String threadPrefix,
+int numThreads
+) {
+this.log = logContext.logger(MultiThreadedEventProcessor.class);
+this.shuttingDown = false;
+this.lock = new ReentrantLock();
+this.accumulator = new EventAccumulator<>();
+this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
+new EventProcessorThread(
+threadPrefix + threadId
+)
+).collect(Collectors.toList());
+this.threads.forEach(EventProcessorThread::start);
+}
+
+/**
+ * The event processor thread. The thread pulls events from the
+ * accumulator and runs them.
+ */
+class EventProcessorThread extends Thread {
+private final Logger log;
+
+EventProcessorThread(
+String name
+) {
+super(name);
+log = new LogContext("[" + name + "]: 
").logger(EventProcessorThread.class);
+setDaemon(false);
+}
+
+private void handleEvents() {
+while (!shuttingDown) {
+  

[GitHub] [kafka] yashmayya commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-05-30 Thread via GitHub


yashmayya commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1210628638


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java:
##
@@ -94,8 +96,8 @@ public void configure(Map configs) {
 {ConfigName.EXCLUDE, "blacklist"},
 }));
 
-exclude = config.getList(ConfigName.EXCLUDE);
-include = config.getList(ConfigName.INCLUDE);
+exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE));
+include = new HashSet<>(config.getList(ConfigName.INCLUDE));

Review Comment:
   Only the first call to `String::hashCode` for a string object actually does 
the hash code computation which is cached and subsequent calls just fetch the 
cached value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-05-30 Thread via GitHub


yashmayya commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1210622194


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.connect;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+/**
+ * This benchmark tests the performance of the {@link ReplaceField} {@link 
org.apache.kafka.connect.transforms.Transformation SMT}
+ * when configured with a large number of include and exclude fields and 
applied on a {@link SourceRecord} containing a similarly
+ * large number of fields.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ReplaceFieldBenchmark {
+
+@Param({"100", "1000", "1"})
+private int fieldCount;
+private ReplaceField replaceFieldSmt;
+private SourceRecord record;
+
+@Setup
+public void setup() {
+this.replaceFieldSmt = new ReplaceField.Value<>();
+Map replaceFieldConfigs = new HashMap<>();
+replaceFieldConfigs.put("exclude",
+IntStream.range(0, fieldCount).filter(x -> (x & 1) == 
0).mapToObj(x -> "Field-" + x).collect(Collectors.joining(",")));
+replaceFieldConfigs.put("include",
+IntStream.range(0, fieldCount).filter(x -> (x & 1) == 
1).mapToObj(x -> "Field-" + x).collect(Collectors.joining(",")));

Review Comment:
   Given the above observations, do you feel like this is still required?



##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java:
##
@@ -94,8 +96,8 @@ public void configure(Map configs) {
 {ConfigName.EXCLUDE, "blacklist"},
 }));
 
-exclude = config.getList(ConfigName.EXCLUDE);
-include = config.getList(ConfigName.INCLUDE);
+exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE));
+include = new HashSet<>(config.getList(ConfigName.INCLUDE));

Review Comment:
   Hm that's an interesting point and I hadn't considered that. However, taking 
a closer look, I don't think there'll be any performance penalty here because 
the `ArrayList::contains` method will involve calls to `String::equals` to 
check whether each value field is present in the exclude / include fields. 
`String::equals` and `String::hashCode` both have linear time complexity in 
terms of the length of the String so there won't be much difference in the 
performance between the `HashSet` version and the `ArrayList` version when 
there's a small number of include / exclude fields. I ran a modified benchmark 
and the results were fairly similar between the two implementations:
   
   ## Using Strings with length ~10
   
   ```
   With HashSet:
   
   Benchmark  (fieldCount)  
Mode  Cnt   Score  Error  Units
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark   100  
avgt51034.411 ±  138.769  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark  1000  
avgt5   12210.318 ±  119.902  ns/op
   ReplaceFieldBenchmark.includeExcludeReplaceFieldBenchmark 1  
avgt5  125383.027 ± 3884.894  ns/op
   
   
---
 

[GitHub] [kafka] novosibman commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-30 Thread via GitHub


novosibman commented on PR #13768:
URL: https://github.com/apache/kafka/pull/13768#issuecomment-1568829724

   > @novosibman What is the file system used in your test? OMB default should 
be `xfs` but wondering if it was changed
   
   Yes, used `xfs`. This fs type was found in OMB settings. 
   Older experiments showed about 10x worse in high percentiles when using 
`ext4` :
   
![image](https://github.com/apache/kafka/assets/6793713/442ac661-53c4-4f23-a364-0e4b6057a3a9)
   
   //  the test is quite short not including Kafka log segments rolling


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13780: KAFKA-15039: Reduce logging level to trace in PartitionChangeBuilder.…

2023-05-30 Thread via GitHub


hachikuji commented on code in PR #13780:
URL: https://github.com/apache/kafka/pull/13780#discussion_r1210611871


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -220,13 +220,15 @@ private boolean isValidNewLeader(int replica) {
 private void tryElection(PartitionChangeRecord record) {
 ElectionResult electionResult = electLeader();
 if (electionResult.node != partition.leader) {
-log.debug(
-"Setting new leader for topicId {}, partition {} to {} using 
{} election",
-topicId,
-partitionId,
-electionResult.node,
-electionResult.unclean ? "an unclean" : "a clean"
-);
+if (log.isTraceEnabled()) {
+log.trace(
+"Setting new leader for topicId {}, partition {} to {} 
using {} election",
+topicId,
+partitionId,
+electionResult.node,
+electionResult.unclean ? "an unclean" : "a clean"

Review Comment:
   I wonder if it would make sense to log unclean elections at a higher level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13467: KAFKA-14863: Hide plugins with invalid constructors during plugin discovery

2023-05-30 Thread via GitHub


gharris1727 commented on PR #13467:
URL: https://github.com/apache/kafka/pull/13467#issuecomment-1568821952

   > Can you describe the impact on users with this change if a plugin with no 
valid constructor is present on the worker?
   
   Suppose you had a Connector without a valid constructor.
   
   1. Before KAFKA-14649, your worker would crash on plugin path scanning with 
an opaque error.
   2. After KAFKA-14649, the worker logs an informative error, hides the plugin 
with a 404 in the REST API and throws ClassNotFoundExceptions for existing 
connector instances.
   
   Suppose you had a Converter without a valid constructor.
   
   1. Before KAFKA-14863 (this change), the worker would not crash or log an 
error. The plugin would be visible in the REST API, and would throw an opaque 
NoSuchMethodException if used in existing connectors.
   2. After KAFKA-14863 (this change), the worker logs an informative error, 
hides the plugin with a 404 in the REST API, and throws ClassNotFoundExceptions 
for existing connectors that use this converter.
   
   > One alternative I'm considering is if we try to make more plugins visible, 
even if they cannot be used, if that ultimately makes it easier for users to 
diagnose issues with them (a stack trace from a REST request is often more 
informative than a message in the worker's logs).
   
   While this is reducing the diagnostic information present in the REST API 
(ClassNotFoundException is less specific than NoSuchMethodException) it is 
making the REST API more consistent (a class that throws ClassNotFoundException 
in validate is also a 404 on the plugins endpoint). I don't think it's 
inappropriate to have users diagnose startup packaging problems with error 
logs, especially when the REST API information before was rather opaque. 
   
   The trouble with keeping the current reflective behavior (showing plugins 
with bad constructors) is that it is difficult to replicate with the 
ServiceLoader. From the ServiceLoader interface, we do not see the 
implementation's class name until after the static initializer and constructor 
has completed without error. If we wished to collect the class name information 
for badly packaged plugins (instead of just hiding them) we'd be required to 
fork the ServiceLoader or depend on error messages to extract class names.
   
   > Also, can you shed some light on the implications this has for 
[KIP-898](https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery)?
 For example, will the behavior of the migration script change?
   
   1. Without KAFKA-14863 (this change), the migration script produces an 
"incomplete" migration. Even if the migration script generated an appropriate 
ServiceLoader manifest, the plugin would appear as if no migration took place. 
Assertions that all plugins were migrated would fail.
   2. With KAFKA-14863 (this change), the migration script hides and does not 
migrate plugins which are missing a valid constructor. After a migration is 
complete, all plugins which are present on the REST API before migration are 
present after, so assertions that all plugins were migrated would pass.
   
   This is the core motivation of this PR: the reflective scanning and 
ServiceLoader scanning error handling should have parity, so that plugins which 
are not usable with the ServiceLoader aren't migrated, and the migration script 
still includes all of the plugins which are visible in the REST API. If we drop 
one of these two properties, then the migration looks incomplete to the user.
   
   I examined some plugin implementations out in the wild, and observed that 
plugins which are packaged and don't have a valid constructor often appear in 
the REST API accidentally. For example, many packaged SMTs have concrete base 
classes without a valid constructor, intending instead to be referenced via 
their `$Key` or $`Value` subclasses. This change is de-cluttering the REST API 
of all of these unintentionally-concrete classes, each of which would cause 
poor migration behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15012) JsonConverter fails when there are leading Zeros in a field

2023-05-30 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727631#comment-17727631
 ] 

Chris Egerton commented on KAFKA-15012:
---

[~yash.mayya] I'm wondering if compatibility is necessarily a concern here. The 
logic this ticket is concerned with is deserialization of data already present 
in Kafka, and converting it to the Kafka Connect format for use by a sink 
connector. There will be nothing different about the data given to those 
connectors depending on whether the JSON in the upstream Kafka record's 
key/value contains a leading zero or not.

As a result, it's hard to think of a reason for those records in Kafka to 
qualify as "invalid" and for there to be any reason for them to land in the DLQ 
besides there being a bug in the converter.

If we agree that this is a bug in the converter, then even if it causes records 
to be sent to the DLQ, that is a result of the DLQ mechanism being a catch-all 
for errors–expected or unexpected–that occur in the connector's data pipeline, 
and fixing those errors should not be considered a breaking change as long as 
they do not lead to unexpected behavior in the connector (which, in this case, 
should be fine).

 

[~ranjanrao] Thanks for filing this ticket. We don't accept patch requests over 
Jira, but if you'd like to submit a pull request on GitHub (preferably with a 
unit test added to verify behavior and prevent regression), I'd be happy to 
review (as long as the discussion on compatibility brought up by Yash can be 
addressed).

 

I'll also note that the idea of a general-purpose KIP to allow users to 
configure arbitrary features for the JsonConverter's underlying (de)serializers 
is fascinating and may be worth pursuing if there are other valuable use cases 
(perhaps skipping over comments could be useful, for example?).

> JsonConverter fails when there are leading Zeros in a field
> ---
>
> Key: KAFKA-15012
> URL: https://issues.apache.org/jira/browse/KAFKA-15012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Ranjan Rao
>Priority: Major
> Attachments: 
> enable_ALLOW_LEADING_ZEROS_FOR_NUMBERS_in_jackson_object_mapper_.patch
>
>
> When there are leading zeros in a field in the Kakfa Record, a sink connector 
> using JsonConverter fails with the below exception
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] 
> to Kafka Connect data failed due to serialization error: 
>   at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:324)
>   at 
> org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:531)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:494)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
>   ... 13 more
> Caused by: org.apache.kafka.common.errors.SerializationException: 
> 

[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-30 Thread via GitHub


dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1568784867

   @erikvanoosten I am sorry but I haven't had the time to get to it yet. Will 
do!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13770: MINOR: Add config to producerStateManager config

2023-05-30 Thread via GitHub


dajac commented on code in PR #13770:
URL: https://github.com/apache/kafka/pull/13770#discussion_r1210559849


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -24,8 +24,11 @@ public class ProducerStateManagerConfig {
 public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
 private volatile int producerIdExpirationMs;
 
-public ProducerStateManagerConfig(int producerIdExpirationMs) {
+private volatile boolean transactionVerificationEnabled;

Review Comment:
   In this case, the `volatile` is likely not required if we don't update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zhangsm6 commented on pull request #13768: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll

2023-05-30 Thread via GitHub


zhangsm6 commented on PR #13768:
URL: https://github.com/apache/kafka/pull/13768#issuecomment-1568759449

   @novosibman What is the file system used in your test? OMB default should be 
`xfs` but wondering if it was changed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13770: MINOR: Add config to producerStateManager config

2023-05-30 Thread via GitHub


jolshan commented on code in PR #13770:
URL: https://github.com/apache/kafka/pull/13770#discussion_r1210529153


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -24,8 +24,11 @@ public class ProducerStateManagerConfig {
 public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
 private volatile int producerIdExpirationMs;
 
-public ProducerStateManagerConfig(int producerIdExpirationMs) {
+private volatile boolean transactionVerificationEnabled;

Review Comment:
   I considered this, but the issue is that we currently start up a thread in 
startup to handle verification. I'm not sure we can do this dynamically.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -24,8 +24,11 @@ public class ProducerStateManagerConfig {
 public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
 private volatile int producerIdExpirationMs;
 
-public ProducerStateManagerConfig(int producerIdExpirationMs) {
+private volatile boolean transactionVerificationEnabled;

Review Comment:
   can do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13776: KAFKA-15034: Improve performance of the ReplaceField SMT; add JMH benchmark

2023-05-30 Thread via GitHub


C0urante commented on code in PR #13776:
URL: https://github.com/apache/kafka/pull/13776#discussion_r1210481491


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.connect;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;

Review Comment:
   (To match the suggestion below RE an explicit setup level)
   ```suggestion
   import org.openjdk.jmh.annotations.Fork;
   import org.openjdk.jmh.annotations.Level;
   ```



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.connect;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.ReplaceField;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+
+/**
+ * This benchmark tests the performance of the {@link ReplaceField} {@link 
org.apache.kafka.connect.transforms.Transformation SMT}
+ * when configured with a large number of include and exclude fields and 
applied on a {@link SourceRecord} containing a similarly
+ * large number of fields.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class ReplaceFieldBenchmark {
+
+@Param({"100", "1000", "1"})
+private int fieldCount;
+private ReplaceField replaceFieldSmt;
+private SourceRecord record;
+
+@Setup

Review Comment:
   In most other benchmarks we're explicit about the setup level, even if it 
matches the default:
   ```suggestion
   @Setup(Level.Trial)
   ```



##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java:
##
@@ -94,8 +96,8 @@ public void configure(Map configs) {
 {ConfigName.EXCLUDE, "blacklist"},
 }));
 
-exclude = config.getList(ConfigName.EXCLUDE);
-include = config.getList(ConfigName.INCLUDE);
+exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE));
+include = new HashSet<>(config.getList(ConfigName.INCLUDE));

Review Comment:
   Is it possible that there's a performance penalty for this approach when the 
transform has been configured with a small number of include/exclude fields 
(e.g., 1), since we'd have to compute the hash code of each field in the value?



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java:
##
@@ 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-30 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1210489030


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   updated to 12. yeah, i've ran it several times to make sure it's not flaky



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-30 Thread via GitHub


rondagostino commented on PR #13759:
URL: https://github.com/apache/kafka/pull/13759#issuecomment-1568668239

   > The 3 missed heartbeats in your example would result in 3 brokers being 
fenced by the time the overload state is entered? Or did I misunderstand? And 
what happens next?
   
   Using the default values of broker session timeout = 9 seconds and broker 
heartbeat interval = 2 seconds, we would need to miss at least 4 heartbeats 
before any single broker could potentially lose its session.  And with 3 
brokers there would by 3 heartbeats arriving every 2 seconds, so after 8 
seconds we would typically receive 3 * 4 = 12 heartbeats.  If we just missed 3 
of them it is conceivable that they would all have been for the same broker.
   
   But note that Colin states above `I think we may not need the overload state 
I originally wanted to introduce at all. We can simply do some basic processing 
on the heartbeat when we time it out. Specifically, we can update the "last 
seen time" of the broker."`.  I believe what he is saying is that when we see a 
heartbeat request in the event queue and it is old enough such that we "missed" 
it, we can still say that the broker that sent it was alive and able to contact 
us at that time (whenever it was -- it could have been several seconds ago by 
the time we see it).  So while we can't respond to it -- the broker has 
timed-out the request and is no longer waiting for the response -- we can still 
note that the broker was alive at that point.  At first glance this seems like 
a lot better approach than introducing this "overload" state -- as the 
discussion shows, such a state isn't intuitive to reason about.  And I see no 
reason why we can't take the approach of counting a heartbeat th
 at we don't respond to as indicating the broker was alive...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10897) kafka quota optimization

2023-05-30 Thread Afshin Moazami (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727591#comment-17727591
 ] 

Afshin Moazami commented on KAFKA-10897:


Hey team, 

I am new to the community. 

We are implementing topic-partition quota at Salesforce fork of Kafka, where 
you can define the produce/consume byte rate per topic-partition. I believe 
that feature is addressing the third suggestion. 

Is there a process to suggest that change, and maybe review the design?

> kafka quota optimization
> 
>
> Key: KAFKA-10897
> URL: https://issues.apache.org/jira/browse/KAFKA-10897
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients, config, consumer, core
>Affects Versions: 2.7.0
>Reporter: yangyijun
>Assignee: Kahn Cheny
>Priority: Blocker
>
> *1.The current quota dimensions is as follows:*
> {code:java}
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/users//clients/
> /config/users//clients/
> /config/users/
> /config/clients/
> /config/clients/{code}
> *2. Existing problems:*
>  
> {code:java}
> 2.1.The quota dimensions is not fine enough.
> 2.2.When multiple users on the same broker produce and consume a large amount 
> of data at the same time, if you want the broker to run normally, you must 
> make the sum of all user quota byte not exceed the upper throughput limit of 
> the broker.
> 2.3.Even if all the user rate does not reach the upper limit of the broker, 
> but all the user rate is concentrated on a few disks and exceeds the 
> read-write load of the disk, all the produce and consume requests will be 
> blocked.
> 2.4.Sometimes it's just one topic rate increase sharply under the user, so we 
> just need to limit the increase sharply topics.
> {code}
>  
> *3. Suggestions for improvement*
> {code:java}
> 3.1. Add the upper limit of single broker quota byte.
> 3.2. Add the upper limit of single disk quota byte on the broker.
> 3.3. Add topic quota dimensions.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-30 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1210468300


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   > As a side note, broker session is not 18 seconds. It is 9 seconds.
   
   I was confused by the existence of this, which sets 18 seconds: 
https://github.com/apache/kafka/blob/d557f7997359daa5853d446d9093995577540de2/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java#L77
   
   But the above always get overridden by the value of 
`broker.session.timeout.ms` in the config, which does default to 9 seconds 
(https://github.com/apache/kafka/blob/d557f7997359daa5853d446d9093995577540de2/core/src/main/scala/kafka/server/KafkaConfig.scala#L83).
   
   So yes, it is 9 seconds as you state above.  I wonder if the default in 
`ClusterControlManager.java` should be changed to match the config's default.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-05-30 Thread via GitHub


clolov commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1568651608

   Okay, in principle I agree that cleaning should be restarted, but I do not 
understand why you want to mark the topicPartition as failed
   ```
   override def markPartitionFailed(topicPartition: TopicPartition): Unit = {
 super.markPartitionFailed(topicPartition) <- I DO NOT UNDERSTAND THE 
PURPOSE OF THIS
 info(s"For ReplicaAlterLogDirsThread, may also need to resume log cleaner 
for partition $topicPartition")
   
 replicaMgr.logManager.resumeCleaning(topicPartition)
   }
   ```
   My overarching question is that if we mark a partition as failed regardless 
of whether it or its ReplicaAlterLogDirsThread fails does this not mean that 
leadership will change and no traffic will be served by the replica on this 
broker anyway?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13772: MINOR: Add helper util `Snapshots.lastContainedLogTimestamp`

2023-05-30 Thread via GitHub


jsancio commented on code in PR #13772:
URL: https://github.com/apache/kafka/pull/13772#discussion_r1210417483


##
raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java:
##
@@ -151,4 +155,24 @@ public static Path markForDelete(Path logDir, 
OffsetAndEpoch snapshotId) {
 );
 }
 }
+
+public static long lastContainedLogTimestamp(RawSnapshotReader reader) {
+try (RecordsSnapshotReader recordsSnapshotReader =
+ RecordsSnapshotReader.of(
+ reader,
+ new IdentitySerde(),

Review Comment:
   `RecordSerde` objects are reusable. You can add an `INSTANCE` static field 
to `IndentitySerde` and use it here. Take a look at `MetadataRecordSerde` for 
an example.



##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -377,19 +375,7 @@ final class KafkaMetadataLog private (
*/
   private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] 
= {
 readSnapshot(snapshotId).asScala.map { reader =>
-  val recordsSnapshotReader = RecordsSnapshotReader.of(
-reader,
-recordSerde,

Review Comment:
   I think you should be able to remove `recordSerde` from the constructor for 
`KafkaMetadataLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15039) Reduce logging level to trace in PartitionChangeBuilder.tryElection()

2023-05-30 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727555#comment-17727555
 ] 

Divij Vaidya commented on KAFKA-15039:
--

Thank you for the find. Could you please share the CPU profile and attach it to 
this ticket? It would be interesting to compare it with non-kraft profiles such 
as captured in https://issues.apache.org/jira/browse/KAFKA-14633 

> Reduce logging level to trace in PartitionChangeBuilder.tryElection()
> -
>
> Key: KAFKA-15039
> URL: https://issues.apache.org/jira/browse/KAFKA-15039
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 3.6.0
>
>
> A CPU profile in a large cluster showed PartitionChangeBuilder.tryElection() 
> taking significant CPU due to logging.  Decrease the logging statements in 
> that method from debug level to trace to mitigate the impact of this CPU hog 
> under normal operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino opened a new pull request, #13780: KAFKA-15039: Reduce logging level to trace in PartitionChangeBuilder.…

2023-05-30 Thread via GitHub


rondagostino opened a new pull request, #13780:
URL: https://github.com/apache/kafka/pull/13780

   …tryElection()
   
   A CPU profile in a large cluster showed PartitionChangeBuilder.tryElection() 
taking significant CPU due to logging. Decrease the logging statements in that 
method from debug level to trace to mitigate the impact of this CPU hog under 
normal operations.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15039) Reduce logging level to trace in PartitionChangeBuilder.tryElection()

2023-05-30 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-15039:
-

 Summary: Reduce logging level to trace in 
PartitionChangeBuilder.tryElection()
 Key: KAFKA-15039
 URL: https://issues.apache.org/jira/browse/KAFKA-15039
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.6.0


A CPU profile in a large cluster showed PartitionChangeBuilder.tryElection() 
taking significant CPU due to logging.  Decrease the logging statements in that 
method from debug level to trace to mitigate the impact of this CPU hog under 
normal operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13779: KAFKA-15037: pass remoteLogEnabled to unifiedLog

2023-05-30 Thread via GitHub


divijvaidya commented on PR #13779:
URL: https://github.com/apache/kafka/pull/13779#issuecomment-1568472012

   For point 2, I think @satishd has some refactoring in mind to add more logic 
to UnifiedLog as per 
https://github.com/apache/kafka/pull/13561#discussion_r1181520975 
   
   > all this logic will go to UnifiedLog in future.
   
   Perhaps, we can perform all the refactoring in one go together later on 
prior to 3.6 release.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14970) Dual write mode testing for SCRAM and Quota

2023-05-30 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano resolved KAFKA-14970.
---
Resolution: Fixed

Committed and merged into 3.5.

> Dual write mode testing for SCRAM and Quota
> ---
>
> Key: KAFKA-14970
> URL: https://issues.apache.org/jira/browse/KAFKA-14970
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Blocker
>  Labels: 3.5
>
> SCRAM and Quota are stored together in ZK and we need better testing to 
> validate the dual write mode support for them.
> I will add some additional tests for this.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13779: KAFKA-15037: pass remoteLogEnabled to unifiedLog

2023-05-30 Thread via GitHub


showuon commented on PR #13779:
URL: https://github.com/apache/kafka/pull/13779#issuecomment-1568352508

   @satishd , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2023-05-30 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727500#comment-17727500
 ] 

Divij Vaidya commented on KAFKA-9693:
-

Thank you for the analysis.

May I perhaps propose an alternative here. The root problem is the synchronous 
nature of the flush API call. Could we instead explore the usage of 
[io_uring|https://unixism.net/loti/what_is_io_uring.html] and push down the 
responsibility to async IO to the OS directly? Admittedly, it would only work 
for newer linux kernel but practically, majority of the Kafka installations 
would probably run on Linux.

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> URL: https://issues.apache.org/jira/browse/KAFKA-9693
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: OS: Amazon Linux 2
> Kafka version: 2.2.1
>Reporter: Paolo Moriello
>Assignee: Paolo Moriello
>Priority: Major
>  Labels: Performance, latency, performance
> Attachments: image-2020-03-10-13-17-34-618.png, 
> image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, 
> image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, 
> image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, 
> image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, 
> latency_plot2.png
>
>
> h1. Summary
> When a log segment fills up, Kafka rolls over onto a new active segment and 
> force the flush of the old segment to disk. When this happens, log segment 
> _append_ duration increase causing important latency spikes on producer(s) 
> and replica(s). This ticket aims to highlight the problem and propose a 
> simple mitigation: add a new configuration to enable/disable rolled segment 
> flush.
> h1. 1. Phenomenon
> Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to 
> ~50x-200x more than usual. For instance, normally 99th %ile is lower than 
> 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th 
> %iles even jump to 500-700ms.
> Latency spikes happen at constant frequency (depending on the input 
> throughput), for small amounts of time. All the producers experience a 
> latency increase at the same time.
> h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314!
> {{Example of response time plot observed during on a single producer.}}
> URPs rarely appear in correspondence of the latency spikes too. This is 
> harder to reproduce, but from time to time it is possible to see a few 
> partitions going out of sync in correspondence of a spike.
> h1. 2. Experiment
> h2. 2.1 Setup
> Kafka cluster hosted on AWS EC2 instances.
> h4. Cluster
>  * 15 Kafka brokers: (EC2 m5.4xlarge)
>  ** Disk: 1100Gb EBS volumes (4750Mbps)
>  ** Network: 10 Gbps
>  ** CPU: 16 Intel Xeon Platinum 8000
>  ** Memory: 64Gb
>  * 3 Zookeeper nodes: m5.large
>  * 6 producers on 6 EC2 instances in the same region
>  * 1 topic, 90 partitions - replication factor=3
> h4. Broker config
> Relevant configurations:
> {quote}num.io.threads=8
>  num.replica.fetchers=2
>  offsets.topic.replication.factor=3
>  num.network.threads=5
>  num.recovery.threads.per.data.dir=2
>  min.insync.replicas=2
>  num.partitions=1
> {quote}
> h4. Perf Test
>  * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per 
> broker)
>  * record size = 2
>  * Acks = 1, linger.ms = 1, compression.type = none
>  * Test duration: ~20/30min
> h2. 2.2 Analysis
> Our analysis showed an high +correlation between log segment flush count/rate 
> and the latency spikes+. This indicates that the spikes in max latency are 
> related to Kafka behavior on rolling over new segments.
> The other metrics did not show any relevant impact on any hardware component 
> of the cluster, eg. cpu, memory, network traffic, disk throughput...
>  
>  !latency_plot2.png|width=924,height=308!
>  {{Correlation between latency spikes and log segment flush count. p50, p95, 
> p99, p999 and p latencies (left axis, ns) and the flush #count (right 
> axis, stepping blue line in plot).}}
> Kafka schedules logs flushing (this includes flushing the file record 
> containing log entries, the offset index, the timestamp index and the 
> transaction index) during _roll_ operations. A log is rolled over onto a new 
> empty log when:
>  * the log segment is full
>  * the maxtime has elapsed since the timestamp of first message in the 
> segment (or, in absence of it, since the create time)
>  * the index is full
> In this case, the increase in latency happens on _append_ of a new message 
> set to the active segment of the log. This is a synchronous operation which 
> therefore blocks producers requests, causing the latency increase.
> To confirm this, I 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-05-30 Thread via GitHub


Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1210141583


##
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
* The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
*/
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
 val group = groupMetadataCache.get(groupId)
 if (group == null) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
   group.inLock {
 if (group.is(Dead)) {
-  topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+  topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
 val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
   Optional.empty(), "", Errors.NONE)
-topicPartition -> partitionData
+topicIdPartition -> partitionData
   }.toMap
 } else {
-  val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-  topicPartitions.map { topicPartition =>
-if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-  topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+  def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
 } else {
-  val partitionData = group.offset(topicPartition) match {
+  group.offset(topicIdPartition) match {
 case None =>
   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
 Optional.empty(), "", Errors.NONE)
 case Some(offsetAndMetadata) =>
   new PartitionData(offsetAndMetadata.offset,
 offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
   }
-  topicPartition -> partitionData
 }
-  }.toMap
+  }
+
+  topicIdPartitionsOpt match {
+case Some(topicIdPartitions) =>
+  topicIdPartitions.map { topicIdPartition =>
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  }.toMap
+
+case None =>
+  val topicIds = replicaManager.metadataCache.topicNamesToIds()
+  group.allOffsets.keySet.map { topicPartition =>
+Option(topicIds.get(topicPartition.topic())) match {
+  case Some(topicId) =>
+val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+topicIdPartition -> resolvePartitionData(topicIdPartition)
+  case None =>
+val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+}
+  }.toMap

Review Comment:
   Sure, Justine. Will get back with a diagram.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in RLM

2023-05-30 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15038:
--
Component/s: core

> Use topic id/name mapping from the Metadata cache in RLM
> 
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in the RemoteLogManager

2023-05-30 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15038:
--
Summary: Use topic id/name mapping from the Metadata cache in the 
RemoteLogManager  (was: Use topic id/name mapping from the Metadata cache in 
RLM)

> Use topic id/name mapping from the Metadata cache in the RemoteLogManager
> -
>
> Key: KAFKA-15038
> URL: https://issues.apache.org/jira/browse/KAFKA-15038
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
>
> Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
> topic id 
> [[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
>  using the information provided during leadership changes, and removing the 
> mapping upon receiving the notification of partition stopped.
> It should be possible to re-use the mapping in a broker's metadata cache, 
> removing the need for the RLM to build and update a local cache thereby 
> duplicating the information in the metadata cache. It also allows to preserve 
> a single source of authority regarding the association between topic names 
> and ids.
> [1] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15038) Use topic id/name mapping from the Metadata cache in RLM

2023-05-30 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15038:
-

 Summary: Use topic id/name mapping from the Metadata cache in RLM
 Key: KAFKA-15038
 URL: https://issues.apache.org/jira/browse/KAFKA-15038
 Project: Kafka
  Issue Type: Sub-task
Reporter: Alexandre Dupriez
Assignee: Alexandre Dupriez


Currently, the {{RemoteLogManager}} maintains its own cache of topic name to 
topic id 
[[1]|https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138]
 using the information provided during leadership changes, and removing the 
mapping upon receiving the notification of partition stopped.

It should be possible to re-use the mapping in a broker's metadata cache, 
removing the need for the RLM to build and update a local cache thereby 
duplicating the information in the metadata cache. It also allows to preserve a 
single source of authority regarding the association between topic names and 
ids.

[1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L138



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13779: KAFKA-15037: pass remoteLogEnabled to unifiedLog

2023-05-30 Thread via GitHub


showuon commented on code in PR #13779:
URL: https://github.com/apache/kafka/pull/13779#discussion_r1210048015


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -159,7 +159,7 @@ public void run() {
 while (!closing) {
 maybeWaitForPartitionsAssignment();
 
-log.info("Polling consumer to receive remote log metadata 
topic records");
+log.trace("Polling consumer to receive remote log metadata 
topic records");

Review Comment:
   Logging this line on each consumer poll will flood the log. Change to trace 
level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13779: KAFKA-15037: pass remoteLogEnabled to unifiedLog

2023-05-30 Thread via GitHub


showuon opened a new pull request, #13779:
URL: https://github.com/apache/kafka/pull/13779

   UnifiedLog relied on the `remoteStorageSystemEnable` to identify if the 
broker is enabling remote storage, but we never pass this value from the config 
into UnifiedLog. So it'll always be false.
   
   In this PR, I did:
   1. pass `remoteStorageSystemEnable` to `UnifiedLog`
   2. remove `remoteLogManager` from the class member of `UnifiedLog` since 
only `UnifiedLog#fetchOffsetByTimestamp` needs `remoteLogManager`, and this can 
be passed when called from `ReplicaManager`.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15037) initialize unifiedLog with remoteStorageSystemEnable correctly

2023-05-30 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15037:
-

 Summary: initialize unifiedLog with remoteStorageSystemEnable 
correctly
 Key: KAFKA-15037
 URL: https://issues.apache.org/jira/browse/KAFKA-15037
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen
Assignee: Luke Chen


UnifiedLog relied on the `remoteStorageSystemEnable` to identify if the broker 
is enabling remote storage, but we never pass this value from the config into 
UnifiedLog. So it'll always be false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-30 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727444#comment-17727444
 ] 

Sagar Rao commented on KAFKA-15018:
---

Thanks [~yash.mayya] . I have assigned this to myself. I went through your 2 
approaches and feel that option 1 (the option also mentioned by Chris) might be 
the cleaner one to handle this case. While the second option is viable, it 
would need some more changes to have it incorporated correctly- as you pointed 
out.

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse

2023-05-30 Thread via GitHub


clolov commented on PR #13764:
URL: https://github.com/apache/kafka/pull/13764#issuecomment-1568117181

   > I have a few comments/questions:
   > 
   > 1. I am not really comfortable with merging this without the server 
side implementation. @clolov Is there a strong reason to not do them together?
   > 
   > 2. I agree with @Hangleton that it may be better to start with adding 
the TopicId only. This is complicated enough on its own. We can the other 
fields afterwards.
   > 
   > 3. I agree with @jolshan that we should set `"latestVersionUnstable": 
true` while in development.
   
   I will accommodate 2 and 3 in subsequent commits. There was no reason for 1 
other than splitting the pull request into nicer to review chunks. I am happy 
with trying to put Request/Respone changes + server side changes + tests in the 
same pull request similar to https://github.com/apache/kafka/pull/13240.
   
   Thank you all for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-30 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15018:
-

Assignee: Sagar Rao

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727427#comment-17727427
 ] 

Bruno Cadonna commented on KAFKA-14981:
---

I have the same feeling as [~ableegoldman]. So, we should first ensure that the 
issues are solved, before proceeding with this ticket.

In general, I would welcome Streams using static membership by default.

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15010) KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new controller while in dual write mode.

2023-05-30 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15010:
---
Fix Version/s: 3.5.0

> KRaft Controller doesn't reconcile with Zookeeper metadata upon becoming new 
> controller while in dual write mode.
> -
>
> Key: KAFKA-15010
> URL: https://issues.apache.org/jira/browse/KAFKA-15010
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Akhilesh Chaganti
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.5.0
>
>
> When a KRaft controller fails over, the existing migration driver (in dual 
> write mode) can fail in between Zookeeper writes and may leave Zookeeper with 
> incomplete and inconsistent data. So when a new controller becomes active 
> (and by extension new migration driver becomes active), this first thing we 
> should do is load the in-memory snapshot and use it to write metadata to 
> Zookeeper to have a steady state. We currently do not do this and it may 
> leave Zookeeper in inconsistent state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] soarez commented on a diff in pull request #13777: KAFKA-15036: UnknownServerError on any leader failover

2023-05-30 Thread via GitHub


soarez commented on code in PR #13777:
URL: https://github.com/apache/kafka/pull/13777#discussion_r1209972559


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2028,8 +2028,11 @@ public CompletableFuture 
finalizedFeatures(
 if (lastCommittedOffset == -1) {
 return CompletableFuture.completedFuture(new 
FinalizedControllerFeatures(Collections.emptyMap(), -1));
 }
+// It's possible for a standby controller to receiving 
ApiVersionRequest and we do not have any timeline snapshot

Review Comment:
   ```suggestion
   // It's possible for a standby controller to receive a 
ApiVersionRequest and we do not have any timeline snapshot
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akatona84 closed pull request #13733: KAFKA-13337: Plugin loader error handling is done separately, per plugin

2023-05-30 Thread via GitHub


akatona84 closed pull request #13733: KAFKA-13337: Plugin loader error handling 
is done separately, per plugin
URL: https://github.com/apache/kafka/pull/13733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akatona84 commented on pull request #13733: KAFKA-13337: Plugin loader error handling is done separately, per plugin

2023-05-30 Thread via GitHub


akatona84 commented on PR #13733:
URL: https://github.com/apache/kafka/pull/13733#issuecomment-1568053208

   PR #13334 addresses my concerns, if I'm not mistaking. It was refactored in 
a way that now it's loading other plugin dirs in case of errors popping up at 
one. Closing this PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-30 Thread via GitHub


soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1209954592


##
checkstyle/suppressions.xml:
##
@@ -41,6 +41,8 @@
 
 
+

Review Comment:
   Makes sense. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-30 Thread via GitHub


dajac commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1568017910

   > Yes, that is fair too. The definition of leadership epoch in which case 
changes to - it represents the version of the leader after a re-election. In 
this case, we should also remove the epoch change during ISR expansion as well. 
My point is, let's keep the definition as state of ISR (current) or state of 
leader (in which case we remove epoch change for both expansion and shrink).
   
   Yeah, I agree that we need to do both in order to remain consistent.
   
   > Aside for it, out of curiosity, is there any other version which 
represents the state of ISR in Kafka? Does replica epoch changes on every 
change to ISR?
   
   There is the partition epoch which is incremented whenever the partition is 
updated. This includes ISR changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13754: MINOR: Hopefully fix flaky FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor

2023-05-30 Thread via GitHub


dajac merged PR #13754:
URL: https://github.com/apache/kafka/pull/13754


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-30 Thread via GitHub


dajac commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1568009841

   > This change also fixes a bug when computing the HWM. When computing the 
HWM, replicas that are not eligible to join the ISR but are caught up should 
not be included in the computation. Otherwise, the HWM will never increase for 
replica.lag.time.max.ms because the shutting down replica is not sending FETCH 
request. Without this additional fix PRODUCE requests would timeout if the 
request timeout is greater than replica.lag.time.max.ms.
   
   I think that the real issue is in `Partition.makeLeader`. As you can see 
there, we only reset the followers' states when the leader epoch is bumped. I 
suppose that this is why you stumbled upon this issue with having shutting down 
replicas holding back advancing the HWM. The issue is that the shutting down 
replica's state is not reset so it remains caught-up for 
`replica.lag.time.max.ms`. I think that we need to update 
`Partition.makeLeader` to always update the followers' states. Obviously, we 
also need your changes to not consider fenced and shutting down replicas in the 
HWM computation.
   
   > Because of the bug above the KRaft controller needs to check the MV to 
guarantee that all brokers support this bug fix before skipping the leader 
epoch bump.
   
   I wonder if we really need this if we change `Partition.makeLeader` as 
explained. It seems to me that the change in `Partition.makeLeader` and 
`Partition.maybeIncrementLeaderHW` together should be backward compatible. What 
do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-30 Thread via GitHub


dajac commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1568001597

   > To achieve the objective that you desired, there is another way without 
changing the definition, i.e. change how the components reacts when the 
version/epoch is changes. We can choose to not restart the fetcher threads on 
each replica when an shrink ISR with leadership epoch change arrives to it for 
processing.
   
   @divijvaidya This does not help. Restarting the fetcher threads is just a 
mean to provide them the new leader epoch that they have to use. Until they get 
it, they can't replicate. This is the annoying part. If you don't restart the 
fetchers and update the leader epoch "live", you still have that period of time 
during which the followers don't have the correct leader epoch. Note that the 
bump also have an impact on producers/consumers as they have to refresh their 
metadata. Overall, I think that the goal is to only bump the leader epoch on 
leadership changes to avoid all those disturbances.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.

2023-05-30 Thread via GitHub


mimaison commented on PR #13748:
URL: https://github.com/apache/kafka/pull/13748#issuecomment-1567999823

   @gharris1727 If @krespo is not able to update the PR, can you open one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13778: Feature/shuai add comment

2023-05-30 Thread via GitHub


divijvaidya commented on PR #13778:
URL: https://github.com/apache/kafka/pull/13778#issuecomment-1567994161

   Duplicate of https://github.com/apache/kafka/pull/13730 
   
   @DengShuaiSimon As I mentioned in the previous PR, unfortunately, we cannot 
merge this change. As per ASF code of conduct policy [1], we expect all 
development of Apache projects (including documentation) to take place in 
English language. You are welcome to submit these changes again in English 
language. Please feel free to comment if you have questions.
   
   [1] 
https://www.apache.org/foundation/policies/conduct.html#diversity-statement


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-30 Thread via GitHub


divijvaidya commented on PR #13765:
URL: https://github.com/apache/kafka/pull/13765#issuecomment-1567981427

   Hey @jsancio 
   With this change, we are changing the semantics of what a leadership epoch 
means. Prior to this change, leadership epoch is a version number representing 
membership of an ISR. As soon as membership changes, this version changes. 
After this change, the definition has changed to - leadership epoch is a 
version number that represents member of an ISR "in some cases". As you can 
see, the new definition has added ifs and buts to the simple definition above. 
Hence, I am not in favour of changing this.
   
   To achieve the objective that you desired, there is another way without 
changing the definition, i.e. change how the components reacts when the 
version/epoch is changes. We can choose to not restart the fetcher threads on 
each replica when an shrink ISR with leadership epoch change arrives to it for 
processing.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13775: MINOR: update release.py

2023-05-30 Thread via GitHub


mimaison commented on code in PR #13775:
URL: https://github.com/apache/kafka/pull/13775#discussion_r1209862688


##
release.py:
##
@@ -698,7 +700,7 @@ def select_gpg_key():
 
 
 email_contents = """
-To: d...@kafka.apache.org, us...@kafka.apache.org, 
kafka-clie...@googlegroups.com
+To: d...@kafka.apache.org, us...@kafka.apache.org

Review Comment:
   `kafka-clie...@googlegroups.com` exists, see 
https://groups.google.com/u/1/g/kafka-clients
   To send emails you need to be subscribed to the group, it's documented in 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Process
   ```
   You need to be subscribed to 
`[kafka-clie...@googlegroups.com](mailto:kafka-clie...@googlegroups.com)` with 
your apache email address – otherwise it bounces back. Just send a message from 
your apache email to 
[kafka-clients+subscr...@googlegroups.com](mailto:kafka-clients+subscr...@googlegroups.com)
 and click `Join` in the confirmation email
   ```



##
release.py:
##
@@ -684,6 +684,8 @@ def select_gpg_key():
 print("Next, we need to get the Maven artifacts we published into the staging 
repository.")
 # TODO: Can we get this closed via a REST API since we already need to collect 
credentials for this repo?
 print("Go to https://repository.apache.org/#stagingRepositories and hit 
'Close' for the new repository that was created by uploading artifacts.")
+print("There will be more than one repository entries created, please close 
all of them.")
+print("There's a known issue that the repository close will be failed: 
KAFKA-15033.")

Review Comment:
   What about:
   ```
   In some cases, you may get errors on some repositories while closing them, 
see KAFKA-15033.
   ```
   
   I did not see this while running 3.5.0 RC0, but yes I saw it in the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13770: MINOR: Add config to producerStateManager config

2023-05-30 Thread via GitHub


dajac commented on code in PR #13770:
URL: https://github.com/apache/kafka/pull/13770#discussion_r1209855532


##
core/src/main/java/kafka/server/builders/LogManagerBuilder.java:
##
@@ -111,7 +111,12 @@ public LogManagerBuilder setMaxTransactionTimeoutMs(int 
maxTransactionTimeoutMs)
 }
 
 public LogManagerBuilder setMaxProducerIdExpirationMs(int 
maxProducerIdExpirationMs) {
-this.producerStateManagerConfig = new 
ProducerStateManagerConfig(maxProducerIdExpirationMs);
+this.producerStateManagerConfig = new 
ProducerStateManagerConfig(maxProducerIdExpirationMs, false);

Review Comment:
   Should we remove `setMaxProducerIdExpirationMs` and use 
`setProducerStateManagerConfig` everywhere?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -24,8 +24,11 @@ public class ProducerStateManagerConfig {
 public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
 private volatile int producerIdExpirationMs;
 
-public ProducerStateManagerConfig(int producerIdExpirationMs) {
+private volatile boolean transactionVerificationEnabled;

Review Comment:
   small nit: I would put an empty line before `producerIdExpirationMs` and 
remove the one between `producerIdExpirationMs` and 
`transactionVerificationEnabled`.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java:
##
@@ -24,8 +24,11 @@ public class ProducerStateManagerConfig {
 public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
 private volatile int producerIdExpirationMs;
 
-public ProducerStateManagerConfig(int producerIdExpirationMs) {
+private volatile boolean transactionVerificationEnabled;

Review Comment:
   Do we plan to make it dynamic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-05-30 Thread via GitHub


machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1567839961

   > Overall LGTM. Could we add tests to verify the committed offsets cache 
will be updated when the consumer committed some offests? Also, you could 
change to "non-draft" state when you're ready. Thanks.
   
   Thank you @showuon for the review. I was away on public holiday yesterday, I 
am catching up today and I'l have a look on adding more tests tomorrow. Once 
that is done, I'll promote the PR and mark it ready for review and ping you 
then. Cheers!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org