MINOR: Consolidate broker request/response handling This patch contains a few small improvements to make request/response handling more consistent. Primarily it consolidates request/response serialization logic so that `SaslServerAuthenticator` and `KafkaApis` follow the same path. It also reduces the amount of custom logic needed to handle unsupported versions of the ApiVersions requests.
Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3673 from hachikuji/consolidate-response-handling Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4d629a0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4d629a0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4d629a0 Branch: refs/heads/trunk Commit: c4d629a0b3cbd11c174cb8b09a50bc8de77825e9 Parents: 05e3850 Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Aug 25 10:23:11 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Aug 25 10:23:11 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientRequest.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 24 +- .../internals/ConsumerNetworkClient.java | 4 +- .../clients/producer/internals/Sender.java | 5 +- .../kafka/common/network/ChannelBuilders.java | 2 +- .../common/network/SaslChannelBuilder.java | 17 +- .../org/apache/kafka/common/network/Send.java | 4 +- .../kafka/common/requests/AbstractRequest.java | 112 +++------ .../kafka/common/requests/AbstractResponse.java | 19 +- .../common/requests/ApiVersionsRequest.java | 27 ++- .../common/requests/ApiVersionsResponse.java | 12 +- .../kafka/common/requests/FetchResponse.java | 21 +- .../kafka/common/requests/RequestContext.java | 92 ++++++++ .../kafka/common/requests/RequestHeader.java | 48 ++-- .../authenticator/SaslClientAuthenticator.java | 8 +- .../authenticator/SaslServerAuthenticator.java | 103 +++++---- .../common/network/SaslChannelBuilderTest.java | 4 +- .../common/requests/RequestContextTest.java | 75 ++++++ .../common/requests/RequestHeaderTest.java | 6 +- .../common/requests/RequestResponseTest.java | 12 +- .../authenticator/SaslAuthenticatorTest.java | 14 +- .../SaslServerAuthenticatorTest.java | 8 +- .../controller/ControllerChannelManager.scala | 2 +- ...nsactionMarkerRequestCompletionHandler.scala | 4 +- .../scala/kafka/network/RequestChannel.scala | 111 ++++----- .../main/scala/kafka/network/SocketServer.scala | 24 +- .../server/ClientRequestQuotaManager.scala | 25 +- .../src/main/scala/kafka/server/KafkaApis.scala | 229 +++++++++---------- .../kafka/server/KafkaRequestHandler.scala | 59 ++--- core/src/main/scala/kafka/utils/Logging.scala | 2 + .../scala/other/kafka/TestOffsetManager.scala | 3 +- .../unit/kafka/admin/AdminRackAwareTest.scala | 5 +- .../admin/ResetConsumerGroupOffsetTest.scala | 9 +- .../TransactionMarkerChannelManagerTest.scala | 11 +- ...tionMarkerRequestCompletionHandlerTest.scala | 30 ++- .../integration/KafkaServerTestHarness.scala | 2 +- .../kafka/integration/PrimitiveApiTest.scala | 3 +- .../kafka/integration/TopicMetadataTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala | 1 - .../kafka/log/ProducerStateManagerTest.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 64 +++--- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../unit/kafka/server/FetchRequestTest.scala | 4 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 21 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../unit/kafka/server/LogRecoveryTest.scala | 4 +- .../unit/kafka/server/OffsetCommitTest.scala | 2 +- .../unit/kafka/server/ReplicaFetchTest.scala | 3 - .../kafka/server/ReplicationQuotasTest.scala | 2 +- .../unit/kafka/server/RequestQuotaTest.scala | 4 +- .../unit/kafka/utils/IteratorTemplateTest.scala | 2 +- .../unit/kafka/utils/timer/TimerTest.scala | 3 +- 52 files changed, 685 insertions(+), 571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 1111964..9b62946 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -79,7 +79,7 @@ public final class ClientRequest { } public RequestHeader makeHeader(short version) { - return new RequestHeader(apiKey().id, version, clientId, correlationId); + return new RequestHeader(apiKey(), version, clientId, correlationId); } public AbstractRequest.Builder<?> requestBuilder() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4fe55ae..897cca5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -247,11 +247,11 @@ public class NetworkClient implements KafkaClient { long now = time.milliseconds(); for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) { if (request.isInternalRequest) { - if (request.header.apiKey() == ApiKeys.METADATA.id) { + if (request.header.apiKey() == ApiKeys.METADATA) { metadataUpdater.handleDisconnection(request.destination); } } else { - requestTypes.add(ApiKeys.forId(request.header.apiKey())); + requestTypes.add(request.header.apiKey()); abortedSends.add(new ClientResponse(request.header, request.callback, request.destination, request.createdTimeMs, now, true, null, null)); @@ -275,7 +275,7 @@ public class NetworkClient implements KafkaClient { public void close(String nodeId) { selector.close(nodeId); for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) - if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id) + if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA) metadataUpdater.handleDisconnection(request.destination); connectionStates.remove(nodeId); } @@ -556,27 +556,21 @@ public class NetworkClient implements KafkaClient { } public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { - return createResponse(parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, - null, 0), requestHeader); + Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0); + return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct); } private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, Sensor throttleTimeSensor, long now) { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); - ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); // Always expect the response version id to be the same as the request version id - Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer); + Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer); correlate(requestHeader, responseHeader); if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME)) throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now); return responseBody; } - private static AbstractResponse createResponse(Struct responseStruct, RequestHeader requestHeader) { - ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); - return AbstractResponse.getResponse(apiKey, responseStruct); - } - /** * Post process disconnection of a node * @@ -602,7 +596,7 @@ public class NetworkClient implements KafkaClient { for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", request.request, request.header.correlationId(), nodeId); - if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id) + if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA) metadataUpdater.handleDisconnection(request.destination); else responses.add(request.disconnected(now)); @@ -666,9 +660,9 @@ public class NetworkClient implements KafkaClient { throttleTimeSensor, now); if (log.isTraceEnabled()) { log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination, - ApiKeys.forId(req.header.apiKey()), req.header.correlationId(), responseStruct); + req.header.apiKey(), req.header.correlationId(), responseStruct); } - AbstractResponse body = createResponse(responseStruct, req.header); + AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 4a38d04..803a853 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.LogContext; @@ -489,10 +488,9 @@ public class ConsumerNetworkClient implements Closeable { future.raise(e); } else if (response.wasDisconnected()) { RequestHeader requestHeader = response.requestHeader(); - ApiKeys api = ApiKeys.forId(requestHeader.apiKey()); int correlation = requestHeader.correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", - api, requestHeader, correlation, response.destination()); + requestHeader.apiKey(), requestHeader, correlation, response.destination()); future.raise(DisconnectException.INSTANCE); } else if (response.versionMismatch() != null) { future.raise(response.versionMismatch()); http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 806cfdf..411282b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -44,7 +44,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -453,8 +452,8 @@ public class Sender implements Runnable { RequestHeader requestHeader = response.requestHeader(); int correlationId = requestHeader.correlationId(); if (response.wasDisconnected()) { - ApiKeys api = ApiKeys.forId(requestHeader.apiKey()); - log.trace("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, requestHeader, correlationId, response.destination()); + log.trace("Cancelled request with header {} due to node {} being disconnected", + requestHeader, response.destination()); for (ProducerBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now); } else if (response.versionMismatch() != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 5145be7..225f5a8 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -98,7 +98,7 @@ public class ChannelBuilders { case SASL_PLAINTEXT: requireNonNullMode(mode, securityProtocol); JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs); - channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, + channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, listenerName, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 445c1ba..0f98463 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -43,6 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class); private final SecurityProtocol securityProtocol; + private final ListenerName listenerName; private final String clientSaslMechanism; private final Mode mode; private final JaasContext jaasContext; @@ -54,12 +55,17 @@ public class SaslChannelBuilder implements ChannelBuilder { private Map<String, ?> configs; private KerberosShortNamer kerberosShortNamer; - public SaslChannelBuilder(Mode mode, JaasContext jaasContext, SecurityProtocol securityProtocol, + public SaslChannelBuilder(Mode mode, + JaasContext jaasContext, + SecurityProtocol securityProtocol, + ListenerName listenerName, String clientSaslMechanism, - boolean handshakeRequestEnable, CredentialCache credentialCache) { + boolean handshakeRequestEnable, + CredentialCache credentialCache) { this.mode = mode; this.jaasContext = jaasContext; this.securityProtocol = securityProtocol; + this.listenerName = listenerName; this.handshakeRequestEnable = handshakeRequestEnable; this.clientSaslMechanism = clientSaslMechanism; this.credentialCache = credentialCache; @@ -109,8 +115,8 @@ public class SaslChannelBuilder implements ChannelBuilder { Authenticator authenticator; if (mode == Mode.SERVER) authenticator = new SaslServerAuthenticator(id, jaasContext, loginManager.subject(), - kerberosShortNamer, socketChannel.socket().getLocalAddress().getHostName(), - credentialCache); + kerberosShortNamer, socketChannel.socket().getLocalAddress(), credentialCache, + listenerName, securityProtocol); else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable); @@ -161,8 +167,7 @@ public class SaslChannelBuilder implements ChannelBuilder { } getInstanceMethod = classRef.getMethod("getInstance", new Class[0]); kerbConf = getInstanceMethod.invoke(classRef, new Object[0]); - getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm", - new Class[0]); + getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm", new Class[0]); return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/Send.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index c64193a..e6febc8 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -20,12 +20,12 @@ import java.io.IOException; import java.nio.channels.GatheringByteChannel; /** - * This interface models the in-progress sending of data to a destination identified by an integer id. + * This interface models the in-progress sending of data to a specific destination */ public interface Send { /** - * The numeric id for the destination of this send + * The id for the destination of this send */ String destination(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 2cd88e1..00de8c1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -105,119 +105,81 @@ public abstract class AbstractRequest extends AbstractRequestResponse { public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e); /** - * Factory method for getting a request object based on ApiKey ID and a buffer + * Factory method for getting a request object based on ApiKey ID and a version */ - public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) { - ApiKeys apiKey = ApiKeys.forId(requestId); - Struct struct = apiKey.parseRequest(version, buffer); - AbstractRequest request; + public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Struct struct) { switch (apiKey) { case PRODUCE: - request = new ProduceRequest(struct, version); - break; + return new ProduceRequest(struct, apiVersion); case FETCH: - request = new FetchRequest(struct, version); - break; + return new FetchRequest(struct, apiVersion); case LIST_OFFSETS: - request = new ListOffsetRequest(struct, version); - break; + return new ListOffsetRequest(struct, apiVersion); case METADATA: - request = new MetadataRequest(struct, version); - break; + return new MetadataRequest(struct, apiVersion); case OFFSET_COMMIT: - request = new OffsetCommitRequest(struct, version); - break; + return new OffsetCommitRequest(struct, apiVersion); case OFFSET_FETCH: - request = new OffsetFetchRequest(struct, version); - break; + return new OffsetFetchRequest(struct, apiVersion); case FIND_COORDINATOR: - request = new FindCoordinatorRequest(struct, version); - break; + return new FindCoordinatorRequest(struct, apiVersion); case JOIN_GROUP: - request = new JoinGroupRequest(struct, version); - break; + return new JoinGroupRequest(struct, apiVersion); case HEARTBEAT: - request = new HeartbeatRequest(struct, version); - break; + return new HeartbeatRequest(struct, apiVersion); case LEAVE_GROUP: - request = new LeaveGroupRequest(struct, version); - break; + return new LeaveGroupRequest(struct, apiVersion); case SYNC_GROUP: - request = new SyncGroupRequest(struct, version); - break; + return new SyncGroupRequest(struct, apiVersion); case STOP_REPLICA: - request = new StopReplicaRequest(struct, version); - break; + return new StopReplicaRequest(struct, apiVersion); case CONTROLLED_SHUTDOWN_KEY: - request = new ControlledShutdownRequest(struct, version); - break; + return new ControlledShutdownRequest(struct, apiVersion); case UPDATE_METADATA_KEY: - request = new UpdateMetadataRequest(struct, version); - break; + return new UpdateMetadataRequest(struct, apiVersion); case LEADER_AND_ISR: - request = new LeaderAndIsrRequest(struct, version); - break; + return new LeaderAndIsrRequest(struct, apiVersion); case DESCRIBE_GROUPS: - request = new DescribeGroupsRequest(struct, version); - break; + return new DescribeGroupsRequest(struct, apiVersion); case LIST_GROUPS: - request = new ListGroupsRequest(struct, version); - break; + return new ListGroupsRequest(struct, apiVersion); case SASL_HANDSHAKE: - request = new SaslHandshakeRequest(struct, version); - break; + return new SaslHandshakeRequest(struct, apiVersion); case API_VERSIONS: - request = new ApiVersionsRequest(struct, version); - break; + return new ApiVersionsRequest(struct, apiVersion); case CREATE_TOPICS: - request = new CreateTopicsRequest(struct, version); - break; + return new CreateTopicsRequest(struct, apiVersion); case DELETE_TOPICS: - request = new DeleteTopicsRequest(struct, version); - break; + return new DeleteTopicsRequest(struct, apiVersion); case DELETE_RECORDS: - request = new DeleteRecordsRequest(struct, version); - break; + return new DeleteRecordsRequest(struct, apiVersion); case INIT_PRODUCER_ID: - request = new InitProducerIdRequest(struct, version); - break; + return new InitProducerIdRequest(struct, apiVersion); case OFFSET_FOR_LEADER_EPOCH: - request = new OffsetsForLeaderEpochRequest(struct, version); - break; + return new OffsetsForLeaderEpochRequest(struct, apiVersion); case ADD_PARTITIONS_TO_TXN: - request = new AddPartitionsToTxnRequest(struct, version); - break; + return new AddPartitionsToTxnRequest(struct, apiVersion); case ADD_OFFSETS_TO_TXN: - request = new AddOffsetsToTxnRequest(struct, version); - break; + return new AddOffsetsToTxnRequest(struct, apiVersion); case END_TXN: - request = new EndTxnRequest(struct, version); - break; + return new EndTxnRequest(struct, apiVersion); case WRITE_TXN_MARKERS: - request = new WriteTxnMarkersRequest(struct, version); - break; + return new WriteTxnMarkersRequest(struct, apiVersion); case TXN_OFFSET_COMMIT: - request = new TxnOffsetCommitRequest(struct, version); - break; + return new TxnOffsetCommitRequest(struct, apiVersion); case DESCRIBE_ACLS: - request = new DescribeAclsRequest(struct, version); - break; + return new DescribeAclsRequest(struct, apiVersion); case CREATE_ACLS: - request = new CreateAclsRequest(struct, version); - break; + return new CreateAclsRequest(struct, apiVersion); case DELETE_ACLS: - request = new DeleteAclsRequest(struct, version); - break; + return new DeleteAclsRequest(struct, apiVersion); case DESCRIBE_CONFIGS: - request = new DescribeConfigsRequest(struct, version); - break; + return new DescribeConfigsRequest(struct, apiVersion); case ALTER_CONFIGS: - request = new AlterConfigsRequest(struct, version); - break; + return new AlterConfigsRequest(struct, apiVersion); default: - throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + + throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); } - return new RequestAndSize(request, struct.sizeOf()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 1686976..5f1f615 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -27,21 +27,12 @@ public abstract class AbstractResponse extends AbstractRequestResponse { public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final int DEFAULT_THROTTLE_TIME = 0; - public Send toSend(String destination, RequestHeader requestHeader) { - return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader()); + protected Send toSend(String destination, ResponseHeader header, short apiVersion) { + return new NetworkSend(destination, serialize(apiVersion, header)); } /** - * This should only be used if we need to return a response with a different version than the request, which - * should be very rare (an example is @link {@link ApiVersionsResponse#unsupportedVersionSend(String, RequestHeader)}). - * Typically {@link #toSend(String, RequestHeader)} should be used. - */ - public Send toSend(String destination, short version, ResponseHeader responseHeader) { - return new NetworkSend(destination, serialize(version, responseHeader)); - } - - /** - * Visible for testing, typically {@link #toSend(String, RequestHeader)} should be used instead. + * Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should be used instead. */ public ByteBuffer serialize(short version, ResponseHeader responseHeader) { return serialize(responseHeader.toStruct(), toStruct(version)); @@ -49,7 +40,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { protected abstract Struct toStruct(short version); - public static AbstractResponse getResponse(ApiKeys apiKey, Struct struct) { + public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) { switch (apiKey) { case PRODUCE: return new ProduceResponse(struct); @@ -120,7 +111,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case ALTER_CONFIGS: return new AlterConfigsResponse(struct); default: - throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " + + throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 6f63040..025ef6c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -45,12 +45,29 @@ public class ApiVersionsRequest extends AbstractRequest { } } + private final Short unsupportedRequestVersion; + public ApiVersionsRequest(short version) { + this(version, null); + } + + public ApiVersionsRequest(short version, Short unsupportedRequestVersion) { super(version); + + // Unlike other request types, the broker handles ApiVersion requests with higher versions than + // supported. It does so by treating the request as if it were v0 and returns a response using + // the v0 response schema. The reason for this is that the client does not yet know what versions + // a broker supports when this request is sent, so instead of assuming the lowest supported version, + // it can use the most recent version and only fallback to the old version when necessary. + this.unsupportedRequestVersion = unsupportedRequestVersion; } public ApiVersionsRequest(Struct struct, short version) { - super(version); + this(version, null); + } + + public boolean hasUnsupportedRequestVersion() { + return unsupportedRequestVersion != null; } @Override @@ -59,16 +76,16 @@ public class ApiVersionsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { + public ApiVersionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + short version = version(); + switch (version) { case 0: return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList()); case 1: return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion())); + version, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion())); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index e9d5023..5a48c93 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -25,7 +24,6 @@ import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -123,15 +121,6 @@ public class ApiVersionsResponse extends AbstractResponse { return createApiVersionsResponse(throttleTimeMs, maxMagic); } - /** - * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we don't support the requested version. - */ - public static Send unsupportedVersionSend(String destination, RequestHeader requestHeader) { - ApiVersionsResponse response = new ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION, - Collections.<ApiVersion>emptyList()); - return response.toSend(destination, (short) 0, requestHeader.toResponseHeader()); - } - public int throttleTimeMs() { return throttleTimeMs; } @@ -169,4 +158,5 @@ public class ApiVersionsResponse extends AbstractResponse { } return tempApiIdToApiVersion; } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index b52b6f5..281ad44 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -235,26 +235,19 @@ public class FetchResponse extends AbstractResponse { } @Override - public Send toSend(String dest, RequestHeader requestHeader) { - return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader); - } - - public Send toSend(int throttleTimeMs, String dest, RequestHeader requestHeader) { - return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader); - } - - private Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) { - Struct responseHeader = new ResponseHeader(requestHeader.correlationId()).toStruct(); + protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { + Struct responseHeaderStruct = responseHeader.toStruct(); + Struct responseBodyStruct = toStruct(apiVersion); // write the total size and the response header - ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4); - buffer.putInt(responseHeader.sizeOf() + responseStruct.sizeOf()); - responseHeader.writeTo(buffer); + ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); + buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); + responseHeaderStruct.writeTo(buffer); buffer.rewind(); List<Send> sends = new ArrayList<>(); sends.add(new ByteBufferSend(dest, buffer)); - addResponseData(responseStruct, throttleTimeMs, dest, sends); + addResponseData(responseBodyStruct, throttleTimeMs, dest, sends); return new MultiSend(dest, sends); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java new file mode 100644 index 0000000..34bb3f5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Protocol; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +import java.net.InetAddress; +import java.nio.ByteBuffer; + +import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS; + +public class RequestContext { + public final RequestHeader header; + public final String connectionId; + public final InetAddress clientAddress; + public final KafkaPrincipal principal; + public final ListenerName listenerName; + public final SecurityProtocol securityProtocol; + + public RequestContext(RequestHeader header, + String connectionId, + InetAddress clientAddress, + KafkaPrincipal principal, + ListenerName listenerName, + SecurityProtocol securityProtocol) { + this.header = header; + this.connectionId = connectionId; + this.clientAddress = clientAddress; + this.principal = principal; + this.listenerName = listenerName; + this.securityProtocol = securityProtocol; + } + + public RequestAndSize parseRequest(ByteBuffer buffer) { + if (isUnsupportedApiVersionsRequest()) { + // Unsupported ApiVersion requests are treated as v0 requests and are not parsed + ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest((short) 0, header.apiVersion()); + return new RequestAndSize(apiVersionsRequest, 0); + } else { + ApiKeys apiKey = header.apiKey(); + try { + short apiVersion = header.apiVersion(); + Struct struct = apiKey.parseRequest(apiVersion, buffer); + AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct); + return new RequestAndSize(body, struct.sizeOf()); + } catch (Throwable ex) { + throw new InvalidRequestException("Error getting request for apiKey: " + apiKey + + ", apiVersion: " + header.apiVersion() + + ", connectionId: " + connectionId + + ", listenerName: " + listenerName + + ", principal: " + principal, ex); + } + } + } + + public Send buildResponse(AbstractResponse body) { + ResponseHeader responseHeader = header.toResponseHeader(); + short version = header.apiVersion(); + + // Use v0 when serializing an unhandled ApiVersion response + if (isUnsupportedApiVersionsRequest()) + version = 0; + + return body.toSend(connectionId, responseHeader, version); + } + + private boolean isUnsupportedApiVersionsRequest() { + return header.apiKey() == API_VERSIONS && !Protocol.apiVersionSupported(API_VERSIONS.id, header.apiVersion()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 18ea576..43b7baf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Protocol; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static java.util.Objects.requireNonNull; + /** * The header for a request in the Kafka protocol */ @@ -31,13 +35,17 @@ public class RequestHeader extends AbstractRequestResponse { private static final String CLIENT_ID_FIELD_NAME = "client_id"; private static final String CORRELATION_ID_FIELD_NAME = "correlation_id"; - private final short apiKey; + private final ApiKeys apiKey; private final short apiVersion; private final String clientId; private final int correlationId; public RequestHeader(Struct struct) { - apiKey = struct.getShort(API_KEY_FIELD_NAME); + short apiKey = struct.getShort(API_KEY_FIELD_NAME); + if (!ApiKeys.hasId(apiKey)) + throw new InvalidRequestException("Unknown API key " + apiKey); + + this.apiKey = ApiKeys.forId(apiKey); apiVersion = struct.getShort(API_VERSION_FIELD_NAME); // only v0 of the controlled shutdown request is missing the clientId @@ -48,17 +56,17 @@ public class RequestHeader extends AbstractRequestResponse { correlationId = struct.getInt(CORRELATION_ID_FIELD_NAME); } - public RequestHeader(short apiKey, short version, String clientId, int correlation) { - this.apiKey = apiKey; + public RequestHeader(ApiKeys apiKey, short version, String clientId, int correlation) { + this.apiKey = requireNonNull(apiKey); this.apiVersion = version; this.clientId = clientId; this.correlationId = correlation; } public Struct toStruct() { - Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion); + Schema schema = Protocol.requestHeaderSchema(apiKey.id, apiVersion); Struct struct = new Struct(schema); - struct.set(API_KEY_FIELD_NAME, apiKey); + struct.set(API_KEY_FIELD_NAME, apiKey.id); struct.set(API_VERSION_FIELD_NAME, apiVersion); // only v0 of the controlled shutdown request is missing the clientId @@ -68,7 +76,7 @@ public class RequestHeader extends AbstractRequestResponse { return struct; } - public short apiKey() { + public ApiKeys apiKey() { return apiKey; } @@ -89,16 +97,27 @@ public class RequestHeader extends AbstractRequestResponse { } public static RequestHeader parse(ByteBuffer buffer) { - short apiKey = buffer.getShort(); - short apiVersion = buffer.getShort(); - Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion); - buffer.rewind(); - return new RequestHeader(schema.read(buffer)); + try { + short apiKey = buffer.getShort(); + short apiVersion = buffer.getShort(); + Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion); + buffer.rewind(); + return new RequestHeader(schema.read(buffer)); + } catch (InvalidRequestException e) { + throw e; + } catch (Throwable ex) { + throw new InvalidRequestException("Error parsing request header. Our best guess of the apiKey is: " + + buffer.getShort(0), ex); + } } @Override public String toString() { - return toStruct().toString(); + return "RequestHeader(apiKey=" + apiKey + + ", apiVersion=" + apiVersion + + ", clientId=" + clientId + + ", correlationId=" + correlationId + + ")"; } @Override @@ -115,11 +134,10 @@ public class RequestHeader extends AbstractRequestResponse { @Override public int hashCode() { - int result = (int) apiKey; + int result = apiKey.hashCode(); result = 31 * result + (int) apiVersion; result = 31 * result + (clientId != null ? clientId.hashCode() : 0); result = 31 * result + correlationId; return result; } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 6dab8f9..6bf9b2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -159,7 +159,7 @@ public class SaslClientAuthenticator implements Authenticator { // fetch supported versions. String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG); SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism); - currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, + currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), clientId, correlationId++); send(handshakeRequest.toSend(node, currentRequestHeader)); setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE); @@ -308,20 +308,18 @@ public class SaslClientAuthenticator implements Authenticator { private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) { AbstractResponse response; - ApiKeys apiKey; try { response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader); - apiKey = ApiKeys.forId(requestHeader.apiKey()); } catch (SchemaException | IllegalArgumentException e) { LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens"); throw new AuthenticationException("Invalid SASL mechanism response", e); } - switch (apiKey) { + switch (requestHeader.apiKey()) { case SASL_HANDSHAKE: handleSaslHandshakeResponse((SaslHandshakeResponse) response); break; default: - throw new IllegalStateException("Unexpected API key during handshake: " + apiKey); + throw new IllegalStateException("Unexpected API key during handshake: " + requestHeader.apiKey()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index ca2d3eb..276d067 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -20,9 +20,10 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; @@ -30,11 +31,12 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.Protocol; -import org.apache.kafka.common.protocol.types.SchemaException; -import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.RequestAndSize; +import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; @@ -61,6 +63,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.security.Principal; @@ -81,11 +84,13 @@ public class SaslServerAuthenticator implements Authenticator { GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED } - private final String node; + private final SecurityProtocol securityProtocol; + private final ListenerName listenerName; + private final String connectionId; private final JaasContext jaasContext; private final Subject subject; private final KerberosShortNamer kerberosNamer; - private final String host; + private final InetAddress clientAddress; private final CredentialCache credentialCache; // Current SASL state @@ -105,17 +110,24 @@ public class SaslServerAuthenticator implements Authenticator { private NetworkReceive netInBuffer; private Send netOutBuffer; - public SaslServerAuthenticator(String node, JaasContext jaasContext, final Subject subject, - KerberosShortNamer kerberosNameParser, String host, - CredentialCache credentialCache) throws IOException { + public SaslServerAuthenticator(String connectionId, + JaasContext jaasContext, + Subject subject, + KerberosShortNamer kerberosNameParser, + InetAddress clientAddress, + CredentialCache credentialCache, + ListenerName listenerName, + SecurityProtocol securityProtocol) throws IOException { if (subject == null) throw new IllegalArgumentException("subject cannot be null"); - this.node = node; + this.connectionId = connectionId; this.jaasContext = jaasContext; this.subject = subject; this.kerberosNamer = kerberosNameParser; - this.host = host; + this.clientAddress = clientAddress; this.credentialCache = credentialCache; + this.listenerName = listenerName; + this.securityProtocol = securityProtocol; } public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) { @@ -140,7 +152,7 @@ public class SaslServerAuthenticator implements Authenticator { try { saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { public SaslServer run() throws SaslException { - return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler); + return Sasl.createSaslServer(saslMechanism, "kafka", clientAddress.getHostName(), configs, callbackHandler); } }); } catch (PrivilegedActionException e) { @@ -211,7 +223,7 @@ public class SaslServerAuthenticator implements Authenticator { return; } - if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, node); + if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId); netInBuffer.readFrom(transportLayer); @@ -233,7 +245,7 @@ public class SaslServerAuthenticator implements Authenticator { case AUTHENTICATE: byte[] response = saslServer.evaluateResponse(clientToken); if (response != null) { - netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response)); + netOutBuffer = new NetworkSend(connectionId, ByteBuffer.wrap(response)); flushNetOutBufferAndUpdateInterestOps(); } // When the authentication exchange is complete and no more tokens are expected from the client, @@ -298,38 +310,32 @@ public class SaslServerAuthenticator implements Authenticator { String clientMechanism = null; try { ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); - RequestHeader requestHeader = RequestHeader.parse(requestBuffer); - ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); + RequestHeader header = RequestHeader.parse(requestBuffer); + ApiKeys apiKey = header.apiKey(); + // A valid Kafka request header was received. SASL authentication tokens are now expected only // following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client. setSaslState(SaslState.HANDSHAKE_REQUEST); isKafkaRequest = true; - if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) { - if (apiKey == ApiKeys.API_VERSIONS) - sendKafkaResponse(ApiVersionsResponse.unsupportedVersionSend(node, requestHeader)); - else - throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + apiKey); - } else { - LOG.debug("Handle Kafka request {}", apiKey); - switch (apiKey) { - case API_VERSIONS: - handleApiVersionsRequest(requestHeader); - break; - case SASL_HANDSHAKE: - short version = requestHeader.apiVersion(); - Struct struct = ApiKeys.SASL_HANDSHAKE.parseRequest(version, requestBuffer); - SaslHandshakeRequest saslHandshakeRequest = new SaslHandshakeRequest(struct, version); - clientMechanism = handleHandshakeRequest(requestHeader, saslHandshakeRequest); - break; - default: - throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake."); - } - } - } catch (SchemaException | IllegalArgumentException e) { + // Raise an error prior to parsing if the api cannot be handled at this layer. This avoids + // unnecessary exposure to some of the more complex schema types. + if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) + throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake."); + + LOG.debug("Handling Kafka request {}", apiKey); + + RequestContext requestContext = new RequestContext(header, connectionId, clientAddress, + KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); + RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); + if (apiKey == ApiKeys.API_VERSIONS) + handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request); + else + clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request); + } catch (InvalidRequestException e) { if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) { - // SchemaException is thrown if the request is not in Kafka format. IllegalArgumentException is thrown - // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token + // InvalidRequestException is thrown if the request is not in Kafka format or if the API key + // is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token // starting with 0x60, revert to GSSAPI for both these exceptions. if (LOG.isDebugEnabled()) { StringBuilder tokenBuilder = new StringBuilder(); @@ -355,25 +361,28 @@ public class SaslServerAuthenticator implements Authenticator { return isKafkaRequest; } - private String handleHandshakeRequest(RequestHeader requestHeader, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException { + private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException { String clientMechanism = handshakeRequest.mechanism(); if (enabledMechanisms.contains(clientMechanism)) { LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism); - sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms)); + sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms)); return clientMechanism; } else { LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism); - sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms)); + sendKafkaResponse(context, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms)); throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism); } } - private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException { - sendKafkaResponse(requestHeader, ApiVersionsResponse.API_VERSIONS_RESPONSE); + private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest apiVersionsRequest) throws IOException, UnsupportedSaslMechanismException { + if (apiVersionsRequest.hasUnsupportedRequestVersion()) + sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception())); + else + sendKafkaResponse(context, ApiVersionsResponse.API_VERSIONS_RESPONSE); } - private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException { - sendKafkaResponse(response.toSend(node, requestHeader)); + private void sendKafkaResponse(RequestContext context, AbstractResponse response) throws IOException { + sendKafkaResponse(context.buildResponse(response)); } private void sendKafkaResponse(Send send) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java index 2f41c77..275104a 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java @@ -71,8 +71,8 @@ public class SaslChannelBuilderTest { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>()); JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); - return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, "PLAIN", - true, null); + return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"), + "PLAIN", true, null); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java new file mode 100644 index 0000000..7679711 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.junit.Test; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RequestContextTest { + + @Test + public void testSerdeUnsupportedApiVersionRequest() throws Exception { + int correlationId = 23423; + + RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId); + RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, + new ListenerName("ssl"), SecurityProtocol.SASL_SSL); + + // Write some garbage to the request buffer. This should be ignored since we will treat + // the unknown version type as v0 which has an empty request body. + ByteBuffer requestBuffer = ByteBuffer.allocate(8); + requestBuffer.putInt(3709234); + requestBuffer.putInt(29034); + requestBuffer.flip(); + + RequestAndSize requestAndSize = context.parseRequest(requestBuffer); + assertTrue(requestAndSize.request instanceof ApiVersionsRequest); + ApiVersionsRequest request = (ApiVersionsRequest) requestAndSize.request; + assertTrue(request.hasUnsupportedRequestVersion()); + + Send send = context.buildResponse(new ApiVersionsResponse(0, Errors.UNSUPPORTED_VERSION, + Collections.<ApiVersionsResponse.ApiVersion>emptyList())); + ByteBufferChannel channel = new ByteBufferChannel(256); + send.writeTo(channel); + + ByteBuffer responseBuffer = channel.buffer(); + responseBuffer.flip(); + responseBuffer.getInt(); // strip off the size + + ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); + assertEquals(correlationId, responseHeader.correlationId()); + + Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer); + ApiVersionsResponse response = (ApiVersionsResponse) AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct); + assertEquals(Errors.UNSUPPORTED_VERSION, response.error()); + assertTrue(response.apiVersions().isEmpty()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java index a1184d8..bc3bd37 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java @@ -39,7 +39,7 @@ public class RequestHeaderTest { rawBuffer.flip(); RequestHeader deserialized = RequestHeader.parse(rawBuffer); - assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, deserialized.apiKey()); + assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY, deserialized.apiKey()); assertEquals(0, deserialized.apiVersion()); assertEquals(correlationId, deserialized.correlationId()); assertEquals("", deserialized.clientId()); @@ -55,7 +55,7 @@ public class RequestHeaderTest { @Test public void testRequestHeader() { - RequestHeader header = new RequestHeader((short) 10, (short) 1, "", 10); + RequestHeader header = new RequestHeader(ApiKeys.FIND_COORDINATOR, (short) 1, "", 10); ByteBuffer buffer = toBuffer(header.toStruct()); RequestHeader deserialized = RequestHeader.parse(buffer); assertEquals(header, deserialized); @@ -63,7 +63,7 @@ public class RequestHeaderTest { @Test public void testRequestHeaderWithNullClientId() { - RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10); + RequestHeader header = new RequestHeader(ApiKeys.FIND_COORDINATOR, (short) 1, null, 10); Struct headerStruct = header.toStruct(); ByteBuffer buffer = toBuffer(headerStruct); RequestHeader deserialized = RequestHeader.parse(buffer); http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 42d4205..067fc27 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -456,10 +456,10 @@ public class RequestResponseTest { @Test public void verifyFetchResponseFullWrite() throws Exception { FetchResponse fetchResponse = createFetchResponse(); - RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ApiKeys.FETCH.latestVersion(), - "client", 15); + short apiVersion = ApiKeys.FETCH.latestVersion(); + int correlationId = 15; - Send send = fetchResponse.toSend("1", header); + Send send = fetchResponse.toSend("1", new ResponseHeader(correlationId), apiVersion); ByteBufferChannel channel = new ByteBufferChannel(send.size()); send.writeTo(channel); channel.close(); @@ -472,11 +472,11 @@ public class RequestResponseTest { // read the header ResponseHeader responseHeader = ResponseHeader.parse(channel.buffer()); - assertEquals(header.correlationId(), responseHeader.correlationId()); + assertEquals(correlationId, responseHeader.correlationId()); // read the body - Struct responseBody = ApiKeys.FETCH.responseSchema(header.apiVersion()).read(buf); - assertEquals(fetchResponse.toStruct(header.apiVersion()), responseBody); + Struct responseBody = ApiKeys.FETCH.responseSchema(apiVersion).read(buf); + assertEquals(fetchResponse.toStruct(apiVersion), responseBody); assertEquals(size, responseHeader.sizeOf() + responseBody.sizeOf()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 77cbdbe..6b0eca3 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -392,7 +392,7 @@ public class SaslAuthenticatorTest { // Send ApiVersionsRequest with unsupported version and validate error response. String node = "1"; createClientConnection(SecurityProtocol.PLAINTEXT, node); - RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1); + RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "someclient", 1); ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(); selector.send(request.toSend(node, header)); ByteBuffer responseBuffer = waitForResponse(); @@ -425,7 +425,7 @@ public class SaslAuthenticatorTest { String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); - RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2); + RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2); selector.send(request.toSend(node1, header)); NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); @@ -490,8 +490,7 @@ public class SaslAuthenticatorTest { sendHandshakeRequestReceiveResponse(node1); ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(); - RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, - request.version(), "someclient", 2); + RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), "someclient", 2); selector.send(request.toSend(node1, versionsHeader)); NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); @@ -556,7 +555,7 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node1); MetadataRequest metadataRequest1 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build(); - RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(), + RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(), "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); @@ -570,7 +569,7 @@ public class SaslAuthenticatorTest { createClientConnection(SecurityProtocol.PLAINTEXT, node2); sendHandshakeRequestReceiveResponse(node2); MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build(); - RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, + RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY); @@ -829,8 +828,7 @@ public class SaslAuthenticatorTest { } private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException { - RequestHeader header = - new RequestHeader(apiKey.id, request.version(), "someclient", 1); + RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient", 1); Send send = request.toSend(node, header); selector.send(send); ByteBuffer responseBuffer = waitForResponse(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index b76f3cc..d37c206 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -19,8 +19,10 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.network.InvalidReceiveException; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.JaasContext; @@ -33,6 +35,7 @@ import org.junit.Test; import javax.security.auth.Subject; import java.io.IOException; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -75,7 +78,7 @@ public class SaslServerAuthenticatorTest { Map<String, ?> configs = Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - final RequestHeader header = new RequestHeader(ApiKeys.METADATA.id, (short) 0, "clientId", 13243); + final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); final Capture<ByteBuffer> size = EasyMock.newCapture(); @@ -113,7 +116,8 @@ public class SaslServerAuthenticatorTest { jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>()); JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); Subject subject = new Subject(); - return new SaslServerAuthenticator("node", jaasContext, subject, null, "localhost", new CredentialCache()); + return new SaslServerAuthenticator("node", jaasContext, subject, null, InetAddress.getLocalHost(), + new CredentialCache(), new ListenerName("ssl"), SecurityProtocol.SASL_SSL); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index b89fb62..57ff203 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -238,7 +238,7 @@ class RequestSendThread(val controllerId: Int, } if (clientResponse != null) { val requestHeader = clientResponse.requestHeader - val api = ApiKeys.forId(requestHeader.apiKey) + val api = requestHeader.apiKey if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY) throw new KafkaException(s"Unexpected apiKey received: $apiKey") http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 19c37fa..150b444 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -37,9 +37,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, val requestHeader = response.requestHeader val correlationId = requestHeader.correlationId if (response.wasDisconnected) { - val api = ApiKeys.forId(requestHeader.apiKey) - val correlation = requestHeader.correlationId - trace(s"Cancelled $api request $requestHeader with correlation id $correlation due to node ${response.destination} being disconnected") + trace(s"Cancelled request with header $requestHeader due to node ${response.destination} being disconnected") for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) { val transactionalId = txnIdAndMarker.txnId