This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bc90c29faf KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) bc90c29faf is described below commit bc90c29fafc69747daeecada8bb0c347e138edc8 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Wed Aug 17 18:11:42 2022 -0700 KAFKA-14167; Completion exceptions should not be translated directly to error codes (#12518) There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `Update [...] Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes. Reviewers: David Arthur <mum...@gmail.com> --- .../org/apache/kafka/common/protocol/Errors.java | 21 ++++- .../requests/AllocateProducerIdsResponse.java | 4 + .../org/apache/kafka/common/requests/ApiError.java | 10 +-- .../apache/kafka/common/requests/ApiErrorTest.java | 6 +- .../main/scala/kafka/server/ControllerApis.scala | 7 +- .../test/junit/RaftClusterInvocationContext.java | 4 +- .../server/AllocateProducerIdsRequestTest.scala | 98 ++++++++++++++++++++++ .../unit/kafka/server/ControllerApisTest.scala | 78 ++++++++++++++++- 8 files changed, 205 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2ca42bafcf..c220bbcde4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -132,6 +132,8 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.function.Function; /** @@ -469,7 +471,8 @@ public enum Errors { * If there are multiple matches in the class hierarchy, the first match starting from the bottom is used. */ public static Errors forException(Throwable t) { - Class<?> clazz = t.getClass(); + Throwable cause = maybeUnwrapException(t); + Class<?> clazz = cause.getClass(); while (clazz != null) { Errors error = classToError.get(clazz); if (error != null) @@ -479,6 +482,22 @@ public enum Errors { return UNKNOWN_SERVER_ERROR; } + /** + * Check if a Throwable is a commonly wrapped exception type (e.g. `CompletionException`) and return + * the cause if so. This is useful to handle cases where exceptions may be raised from a future or a + * completion stage (as might be the case for requests sent to the controller in `ControllerApis`). + * + * @param t The Throwable to check + * @return The throwable itself or its cause if it is an instance of a commonly wrapped exception type + */ + public static Throwable maybeUnwrapException(Throwable t) { + if (t instanceof CompletionException || t instanceof ExecutionException) { + return t.getCause(); + } else { + return t; + } + } + private static String toHtml() { final StringBuilder b = new StringBuilder(); b.append("<table class=\"data-table\"><tbody>\n"); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java index 5d48c39e80..41db29158e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java @@ -56,6 +56,10 @@ public class AllocateProducerIdsResponse extends AbstractResponse { return data.throttleTimeMs(); } + public Errors error() { + return Errors.forCode(data.errorCode()); + } + public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) { return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData( new ByteBufferAccessor(buffer), version)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java index 01966532d6..dd127fc7a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -21,8 +21,6 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.Errors; import java.util.Objects; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; /** * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only @@ -38,15 +36,9 @@ public class ApiError { private final String message; public static ApiError fromThrowable(Throwable t) { - Throwable throwableToBeEncoded = t; - // Get the underlying cause for common exception types from the concurrent library. - // This is useful to handle cases where exceptions may be raised from a future or a - // completion stage (as might be the case for requests sent to the controller in `ControllerApis`) - if (t instanceof CompletionException || t instanceof ExecutionException) { - throwableToBeEncoded = t.getCause(); - } // Avoid populating the error message if it's a generic one. Also don't populate error // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information. + Throwable throwableToBeEncoded = Errors.maybeUnwrapException(t); Errors error = Errors.forException(throwableToBeEncoded); String message = error == Errors.UNKNOWN_SERVER_ERROR || error.message().equals(throwableToBeEncoded.getMessage()) ? null : throwableToBeEncoded.getMessage(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java index 8b0aa470be..bf352dbb4a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java @@ -41,10 +41,10 @@ public class ApiErrorTest { @ParameterizedTest @MethodSource("parameters") - public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedErrors, String expectedMsg) { + public void fromThrowableShouldReturnCorrectError(Throwable t, Errors expectedError, String expectedMsg) { ApiError apiError = ApiError.fromThrowable(t); - assertEquals(apiError.error(), expectedErrors); - assertEquals(apiError.message(), expectedMsg); + assertEquals(expectedError, apiError.error()); + assertEquals(expectedMsg, apiError.message()); } private static Collection<Arguments> parameters() { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index efb6a36c3d..511d4b333c 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -20,7 +20,7 @@ package kafka.server import java.util import java.util.{Collections, OptionalLong} import java.util.Map.Entry -import java.util.concurrent.{CompletableFuture, CompletionException} +import java.util.concurrent.CompletableFuture import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers @@ -117,10 +117,7 @@ class ControllerApis(val requestChannel: RequestChannel, // log the original exception here error(s"Unexpected error handling request ${request.requestDesc(true)} " + s"with context ${request.context}", exception) - - // For building the correct error request, we do need send the "cause" exception - val actualException = if (exception.isInstanceOf[CompletionException]) exception.getCause else exception - requestHelper.handleError(request, actualException) + requestHelper.handleError(request, exception) } } } catch { diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 5cd3ec3e24..40669f3068 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -267,11 +267,11 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId)); } - private Stream<BrokerServer> brokers() { + public Stream<BrokerServer> brokers() { return clusterReference.get().brokers().values().stream(); } - private Stream<ControllerServer> controllers() { + public Stream<ControllerServer> controllers() { return clusterReference.get().controllers().values().stream(); } diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala new file mode 100644 index 0000000000..5cb59573d1 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.network.SocketServer +import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils} +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import org.apache.kafka.common.message.AllocateProducerIdsRequestData +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests._ +import org.apache.kafka.server.common.ProducerIdsBlock +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Tag, Timeout} + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { + + @ClusterTest + def testAllocateProducersIdSentToController(): Unit = { + val raftCluster = cluster.asInstanceOf[RaftClusterInstance] + val sourceBroker = raftCluster.brokers.findFirst().get() + + val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerServer = raftCluster.controllers() + .filter(_.config.nodeId == controllerId) + .findFirst() + .get() + + val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer) + assertEquals(Errors.NONE, allocateResponse.error) + assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, allocateResponse.data.producerIdLen) + assertTrue(allocateResponse.data.producerIdStart >= 0) + } + + @ClusterTest(controllers = 3) + def testAllocateProducersIdSentToNonController(): Unit = { + val raftCluster = cluster.asInstanceOf[RaftClusterInstance] + val sourceBroker = raftCluster.brokers.findFirst().get() + + val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerServer = raftCluster.controllers() + .filter(_.config.nodeId != controllerId) + .findFirst() + .get() + + val allocateResponse = sendAndReceiveAllocateProducerIds(sourceBroker, controllerServer) + assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(allocateResponse.data.errorCode)) + } + + private def sendAndReceiveAllocateProducerIds( + sourceBroker: BrokerServer, + controllerServer: ControllerServer + ): AllocateProducerIdsResponse = { + val allocateRequest = new AllocateProducerIdsRequest.Builder( + new AllocateProducerIdsRequestData() + .setBrokerId(sourceBroker.config.brokerId) + .setBrokerEpoch(sourceBroker.lifecycleManager.brokerEpoch) + ).build() + + connectAndReceive( + controllerServer.socketServer, + allocateRequest + ) + } + + private def connectAndReceive( + controllerSocketServer: SocketServer, + request: AllocateProducerIdsRequest + ): AllocateProducerIdsResponse = { + IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse]( + request, + controllerSocketServer, + cluster.controllerListenerName.get + ) + } + +} diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 0fc9611452..05bd13d795 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -48,9 +48,9 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.{ElectionType, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT -import org.apache.kafka.controller.{Controller, ControllerRequestContext} +import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError} import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} -import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -61,7 +61,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import java.net.InetAddress import java.util -import java.util.Collections.singletonList +import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit} import java.util.concurrent.atomic.AtomicReference import java.util.{Collections, Properties} @@ -876,6 +876,78 @@ class ControllerApisTest { assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode)) } + @Test + def testDeleteTopicsReturnsNotController(): Unit = { + val topicId = Uuid.randomUuid() + val topicName = "foo" + val controller = mock(classOf[Controller]) + val controllerApis = createControllerApis(None, controller) + + val findNamesFuture = CompletableFuture.completedFuture( + singletonMap(topicId, new ResultOrError(topicName)) + ) + when(controller.findTopicNames( + any[ControllerRequestContext], + ArgumentMatchers.eq(singleton(topicId)) + )).thenReturn(findNamesFuture) + + val findIdsFuture = CompletableFuture.completedFuture( + Collections.emptyMap[String, ResultOrError[Uuid]]() + ) + when(controller.findTopicIds( + any[ControllerRequestContext], + ArgumentMatchers.eq(Collections.emptySet()) + )).thenReturn(findIdsFuture) + + val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]() + deleteFuture.completeExceptionally(new NotControllerException("Controller has moved")) + when(controller.deleteTopics( + any[ControllerRequestContext], + ArgumentMatchers.eq(singleton(topicId)) + )).thenReturn(deleteFuture) + + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData().setTopics(singletonList( + new DeleteTopicState().setTopicId(topicId) + )) + ).build() + + val response = handleRequest[DeleteTopicsResponse](request, controllerApis) + val topicIdResponse = response.data.responses.asScala.find(_.topicId == topicId).get + assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(topicIdResponse.errorCode)) + } + + @Test + def testAllocateProducerIdsReturnsNotController(): Unit = { + val controller = mock(classOf[Controller]) + val controllerApis = createControllerApis(None, controller) + + // We construct the future here to mimic the logic in `QuorumController.allocateProducerIds`. + // When an exception is raised on the original future, the `thenApply` future is also completed + // exceptionally, but the underlying cause is wrapped in a `CompletionException`. + val future = new CompletableFuture[ProducerIdsBlock] + val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] { result => + new AllocateProducerIdsResponseData() + .setProducerIdStart(result.firstProducerId()) + .setProducerIdLen(result.size()) + } + future.completeExceptionally(new NotControllerException("Controller has moved")) + + val request = new AllocateProducerIdsRequest.Builder( + new AllocateProducerIdsRequestData() + .setBrokerId(4) + .setBrokerEpoch(93234) + ).build() + + when(controller.allocateProducerIds( + any[ControllerRequestContext], + ArgumentMatchers.eq(request.data) + )).thenReturn(thenApplyFuture) + + val response = handleRequest[AllocateProducerIdsResponse](request, controllerApis) + assertEquals(Errors.NOT_CONTROLLER, response.error) + } + private def handleRequest[T <: AbstractResponse]( request: AbstractRequest, controllerApis: ControllerApis