[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263961378


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.
+ */
+private final boolean isFault;
+
+/**
+ * True if this exception should cause a controller failover.
+ * All faults cause failover
+ */
+private final boolean causesFailover;
+
+/**
+ * The internal exception.
+ */
+private final Throwable internalException;
+
+/**
+ * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+ * be presented directly.
+ */
+private final Optional externalException;
+
+/**
+ * Create an EventHandlerExceptionInfo object from an internal exception.
+ *
+ * @param internal  The internal exception.
+ * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+ *
+ * @return  The new immutable info object.
+ */
+public static EventHandlerExceptionInfo fromInternal(
+Throwable internal,
+Supplier latestControllerSupplier
+) {
+if (internal instanceof ApiException) {
+// This exception is a standard API error response from the 
controller, which can pass
+// through without modification.
+return new EventHandlerExceptionInfo(false, false, internal);
+} else if (internal instanceof NotLeaderException) {
+// The controller has lost leadership.
+return new EventHandlerExceptionInfo(false, true, internal,
+
ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+} else if (internal instanceof RejectedExecutionException) {
+// The controller event queue is shutting down.
+return new EventHandlerExceptionInfo(false, false, internal,
+new TimeoutException("The controller is shutting down.", 
internal));
+} else if (internal instanceof BoundedListTooLongException) {
+// The operation could not be performed because it would have 
created an overly large
+// batch.
+return new EventHandlerExceptionInfo(false, false, internal,
+new PolicyViolationException("Unable to perform excessively 
large batch " +
+"operation."));
+} else if (internal instanceof UnexpectedEndOffsetException) {
+// The active controller picked the wrong end offset for its next 
batch. It must now
+// fail over. This should be pretty rare.
+return new EventHandlerExceptionInfo(false, true, internal,
+new NotControllerException("Unexpected end offset. Controller 
not known."));

Review Comment:
   Yes, "will resign" is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263962510


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263963588


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.
+ */
+private final boolean isFault;
+
+/**
+ * True if this exception should cause a controller failover.
+ * All faults cause failover
+ */
+private final boolean causesFailover;
+
+/**
+ * The internal exception.
+ */
+private final Throwable internalException;
+
+/**
+ * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+ * be presented directly.
+ */
+private final Optional externalException;
+
+/**
+ * Create an EventHandlerExceptionInfo object from an internal exception.
+ *
+ * @param internal  The internal exception.
+ * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+ *
+ * @return  The new immutable info object.
+ */
+public static EventHandlerExceptionInfo fromInternal(
+Throwable internal,
+Supplier latestControllerSupplier
+) {
+if (internal instanceof ApiException) {
+// This exception is a standard API error response from the 
controller, which can pass
+// through without modification.
+return new EventHandlerExceptionInfo(false, false, internal);
+} else if (internal instanceof NotLeaderException) {
+// The controller has lost leadership.
+return new EventHandlerExceptionInfo(false, true, internal,
+
ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+} else if (internal instanceof RejectedExecutionException) {
+// The controller event queue is shutting down.
+return new EventHandlerExceptionInfo(false, false, internal,
+new TimeoutException("The controller is shutting down.", 
internal));
+} else if (internal instanceof BoundedListTooLongException) {
+// The operation could not be performed because it would have 
created an overly large
+// batch.
+return new EventHandlerExceptionInfo(false, false, internal,
+new PolicyViolationException("Unable to perform excessively 
large batch " +
+"operation."));
+} else if (internal instanceof UnexpectedEndOffsetException) {
+// The active controller picked the wrong end offset for its next 
batch. It must now
+// fail over. This should be pretty rare.
+return new EventHandlerExceptionInfo(false, true, internal,
+new NotControllerException("Unexpected end offset. Controller 
not known."));
+} else if (internal instanceof InterruptedException) {
+// The controller event queue has been interrupted. This normally 
only happens during
+// a JUnit test that has hung. The test framework sometimes sends 
an InterruptException
+// to all threads to try to get them to shut down. This isn't the 
correct way to shut
+// the test, but it may happen if someth

[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263966038


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -458,38 +459,32 @@ private void handleEventEnd(String name, long 
startProcessingTimeNs) {
 
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
 }
 
-private Throwable handleEventException(String name,
-   OptionalLong startProcessingTimeNs,
-   Throwable exception) {
-Throwable externalException =
-ControllerExceptions.toExternalException(exception, () -> 
latestController());
-if (!startProcessingTimeNs.isPresent()) {
-log.error("{}: unable to start processing because of {}. Reason: 
{}", name,
-exception.getClass().getSimpleName(), exception.getMessage());
-return externalException;
-}
-long endProcessingTime = time.nanoseconds();
-long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
-long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-if (ControllerExceptions.isExpected(exception)) {
-log.info("{}: failed with {} in {} us. Reason: {}", name,
-exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
-return externalException;
+private Throwable handleEventException(
+String name,
+OptionalLong startProcessingTimeNs,
+Throwable exception
+) {
+OptionalLong deltaUs;
+if (startProcessingTimeNs.isPresent()) {
+long endProcessingTime = time.nanoseconds();
+long deltaNs = endProcessingTime - 
startProcessingTimeNs.getAsLong();
+deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, 
NANOSECONDS));
+} else {
+deltaUs = OptionalLong.empty();
+}
+EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
+fromInternal(exception, () -> latestController());
+String failureMessage = info.failureMessage(lastCommittedEpoch, 
deltaUs,
+isActiveController(), lastCommittedOffset);
+if (info.isFault()) {
+nonFatalFaultHandler.handleFault(name + ": " + failureMessage, 
exception);
+} else {
+log.info("{}: {}", name, failureMessage);

Review Comment:
   In general if something isn't a fault, it shouldn't be logged at ERROR level.
   
   For example, if someone tries to create a topic but one already exists with 
that name, that should not cause ERROR messages in the controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263971924


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -740,22 +735,24 @@ public Long apply(List records) {
 // Start by trying to apply the record to our 
in-memory state. This should always
 // succeed; if it does not, that's a fatal error. 
It is important to do this before
 // scheduling the record for Raft replication.
-int i = 1;
+int i = 0;

Review Comment:
   it's just an index. I can call it `recordIndex`.
   
   I will revise the error mesage a bit too. The clunky wording from before 
reflected the fact that we didn't really know the offset.



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -973,14 +970,14 @@ public void 
handleCommit(BatchReader reader) {
 log.debug("Replaying commits from the active 
node up to " +
 "offset {} and epoch {}.", offset, epoch);
 }
-int i = 1;
+int i = 0;

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264232306


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch 
newLeader) {
 renounce();
 }
 } else if (newLeader.isLeader(nodeId)) {
-log.info("Becoming the active controller at epoch {}, 
committed offset {}, " +
-"committed epoch {}", newLeader.epoch(), 
lastCommittedOffset,
-lastCommittedEpoch);
-claim(newLeader.epoch());
+long newLastWriteOffset = endOffset - 1;

Review Comment:
   The endOffset comes directly from the log and is as described... the end 
offset (exclusive).
   
   Thinking about it more, I don't think I need to assume that it's committed, 
so I won't.
   
   But it is used to calculate the next offset that the active controller 
should try to write to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-16 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264717661


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -740,22 +739,27 @@ public Long apply(List records) {
 // Start by trying to apply the record to our 
in-memory state. This should always
 // succeed; if it does not, that's a fatal error. 
It is important to do this before
 // scheduling the record for Raft replication.
-int i = 1;
+int recordIndex = 0;
 for (ApiMessageAndVersion message : records) {
+long recordOffset = prevEndOffset + 1 + 
recordIndex;
 try {
-replay(message.message(), 
Optional.empty(), prevEndOffset + records.size());
+replay(message.message(), 
Optional.empty(), recordOffset);
 } catch (Throwable e) {
-String failureMessage = 
String.format("Unable to apply %s record, which was " +
-"%d of %d record(s) in the batch 
following last write offset %d.",
-
message.message().getClass().getSimpleName(), i, records.size(),
-prevEndOffset);
+String failureMessage = 
String.format("Unable to apply %s " +
+"record at offset %d on active 
controller, from the " +
+"batch with baseOffset %d",
+
message.message().getClass().getSimpleName(),
+recordOffset, prevEndOffset + 1);
 throw 
fatalFaultHandler.handleFault(failureMessage, e);
 }
-i++;
+recordIndex++;
 }
-prevEndOffset = 
raftClient.scheduleAtomicAppend(controllerEpoch, records);
-
snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
-return prevEndOffset;
+long nextEndOffset = prevEndOffset + recordIndex;
+raftClient.scheduleAtomicAppend(controllerEpoch, 
OptionalLong.of(nextEndOffset), records);
+
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
+
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);

Review Comment:
   Good catch. That was not intentional. Although given the way that function 
works, would have been benign.
   
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267130598


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -80,8 +81,11 @@ interface Listener {
  * epoch.
  *
  * @param leader the current leader and epoch
+ * @param endOffset the current log end offset (exclusive). This is 
useful for nodes that
+ *  are claiming leadership, because it lets them know 
what log offset they
+ *  should attempt to write to next.

Review Comment:
   > What problem are you trying so solve?
   
   We have to know the offset of records that we apply. But we apply records 
before we submit them to Raft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org