KAFKA-4763; Handle disk failure for JBOD (KIP-112) Author: Dong Lin <[email protected]>
Reviewers: Jiangjie Qin <[email protected]>, Jun Rao <[email protected]>, Ismael Juma <[email protected]>, Onur Karaman <[email protected]> Closes #2929 from lindong28/KAFKA-4763 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc93fb4b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc93fb4b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc93fb4b Branch: refs/heads/trunk Commit: fc93fb4b6116e809f2b69ddb2f7e0f12548fef51 Parents: 91b5fc7 Author: Dong Lin <[email protected]> Authored: Sat Jul 22 12:35:32 2017 -0700 Committer: Jiangjie Qin <[email protected]> Committed: Sat Jul 22 12:35:32 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 5 +- .../org/apache/kafka/common/PartitionInfo.java | 21 +- .../common/errors/KafkaStorageException.java | 50 ++ .../apache/kafka/common/protocol/Errors.java | 21 +- .../apache/kafka/common/protocol/Protocol.java | 138 +++++- .../apache/kafka/common/record/FileRecords.java | 7 + .../kafka/common/requests/FetchResponse.java | 11 +- .../common/requests/LeaderAndIsrRequest.java | 12 +- .../kafka/common/requests/MetadataRequest.java | 1 + .../kafka/common/requests/MetadataResponse.java | 57 ++- .../kafka/common/requests/PartitionState.java | 8 +- .../kafka/common/requests/ProduceRequest.java | 2 + .../kafka/common/requests/ProduceResponse.java | 9 +- .../common/requests/UpdateMetadataRequest.java | 58 ++- .../internals/AbstractCoordinatorTest.java | 4 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../clients/consumer/internals/FetcherTest.java | 3 +- .../internals/TransactionManagerTest.java | 2 +- .../apache/kafka/common/PartitionInfoTest.java | 15 +- .../kafka/common/protocol/ErrorsTest.java | 4 +- .../common/requests/RequestResponseTest.java | 21 +- .../main/scala/kafka/admin/AdminClient.scala | 2 +- .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- core/src/main/scala/kafka/api/ApiVersion.scala | 12 +- .../src/main/scala/kafka/api/LeaderAndIsr.scala | 15 +- .../main/scala/kafka/cluster/Partition.scala | 119 +++-- .../kafka/common/KafkaStorageException.scala | 6 +- .../controller/ControllerChannelManager.scala | 51 +- .../kafka/controller/ControllerState.scala | 11 +- .../kafka/controller/KafkaController.scala | 169 +++++-- .../controller/PartitionLeaderSelector.scala | 13 +- .../controller/PartitionStateMachine.scala | 7 +- .../kafka/controller/ReplicaStateMachine.scala | 5 +- .../kafka/controller/TopicDeletionManager.scala | 2 +- .../group/GroupMetadataManager.scala | 10 +- .../transaction/TransactionCoordinator.scala | 2 +- .../transaction/TransactionStateManager.scala | 5 +- .../main/scala/kafka/log/AbstractIndex.scala | 6 + core/src/main/scala/kafka/log/Log.scala | 495 ++++++++++--------- core/src/main/scala/kafka/log/LogCleaner.scala | 107 ++-- .../scala/kafka/log/LogCleanerManager.scala | 59 ++- core/src/main/scala/kafka/log/LogManager.scala | 491 +++++++++++------- core/src/main/scala/kafka/log/LogSegment.scala | 47 +- .../scala/kafka/log/ProducerStateManager.scala | 10 +- .../kafka/server/AbstractFetcherThread.scala | 10 +- .../kafka/server/BrokerMetadataCheckpoint.scala | 3 +- .../kafka/server/DelayedDeleteRecords.scala | 18 +- .../main/scala/kafka/server/DelayedFetch.scala | 6 +- .../scala/kafka/server/DelayedProduce.scala | 5 +- .../src/main/scala/kafka/server/KafkaApis.scala | 70 ++- .../main/scala/kafka/server/KafkaServer.scala | 42 +- .../kafka/server/LogDirFailureChannel.scala | 55 +++ .../main/scala/kafka/server/MetadataCache.scala | 21 +- .../kafka/server/ReplicaFetcherThread.scala | 68 ++- .../scala/kafka/server/ReplicaManager.scala | 296 ++++++++--- .../server/checkpoints/CheckpointFile.scala | 124 ++--- .../checkpoints/LeaderEpochCheckpointFile.scala | 5 +- .../checkpoints/OffsetCheckpointFile.scala | 5 +- .../main/scala/kafka/utils/LogDirUtils.scala | 66 +++ core/src/main/scala/kafka/utils/ZkUtils.scala | 7 +- .../kafka/api/AuthorizerIntegrationTest.scala | 8 +- .../kafka/api/IntegrationTestHarness.scala | 5 +- .../kafka/api/LogDirFailureTest.scala | 126 +++++ .../kafka/api/TransactionsTest.scala | 4 +- .../ReplicaFetcherThreadFatalErrorTest.scala | 2 +- .../group/GroupMetadataManagerTest.scala | 10 +- ...tionMarkerRequestCompletionHandlerTest.scala | 2 +- .../TransactionStateManagerTest.scala | 2 +- .../log/AbstractLogCleanerIntegrationTest.scala | 3 +- .../kafka/log/LogCleanerIntegrationTest.scala | 2 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 2 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 10 +- .../scala/unit/kafka/log/LogManagerTest.scala | 22 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 4 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 12 +- .../server/HighwatermarkPersistenceTest.scala | 26 +- .../unit/kafka/server/ISRExpirationTest.scala | 14 +- .../unit/kafka/server/LeaderElectionTest.scala | 4 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 4 +- .../unit/kafka/server/MetadataCacheTest.scala | 18 +- .../kafka/server/ReplicaFetcherThreadTest.scala | 6 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 4 +- .../unit/kafka/server/ReplicaManagerTest.scala | 43 +- .../unit/kafka/server/RequestQuotaTest.scala | 7 +- .../unit/kafka/server/SimpleFetchTest.scala | 5 +- .../checkpoints/OffsetCheckpointFileTest.scala | 14 +- .../epoch/OffsetsForLeaderEpochTest.scala | 26 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 20 +- docs/upgrade.html | 57 ++- .../integration/utils/IntegrationTestUtils.java | 8 +- .../internals/InternalTopicManagerTest.java | 2 +- tests/kafkatest/services/kafka/config.py | 2 +- .../kafkatest/services/kafka/config_property.py | 4 + tests/kafkatest/services/kafka/kafka.py | 59 ++- tests/kafkatest/services/verifiable_producer.py | 5 +- .../tests/core/log_dir_failure_test.py | 177 +++++++ tests/kafkatest/tests/core/transactions_test.py | 3 +- 97 files changed, 2526 insertions(+), 1096 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index f5ebac5..87f04b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -864,7 +864,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); } - } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { + } else if (error == Errors.NOT_LEADER_FOR_PARTITION || + error == Errors.KAFKA_STORAGE_ERROR) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); this.metadata.requestUpdate(); } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { @@ -884,7 +885,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { log.warn("Not authorized to read from topic {}.", tp.topic()); throw new TopicAuthorizationException(Collections.singleton(tp.topic())); - } else if (error == Errors.UNKNOWN) { + } else if (error == Errors.UNKNOWN_SERVER_ERROR) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data"); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java index b351116..38e4f67 100644 --- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java @@ -17,7 +17,7 @@ package org.apache.kafka.common; /** - * Information about a topic-partition. + * This is used to describe per-partition state in the MetadataResponse. */ public class PartitionInfo { @@ -26,13 +26,20 @@ public class PartitionInfo { private final Node leader; private final Node[] replicas; private final Node[] inSyncReplicas; + private final Node[] offlineReplicas; + // Used only by tests public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) { + this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]); + } + + public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) { this.topic = topic; this.partition = partition; this.leader = leader; this.replicas = replicas; this.inSyncReplicas = inSyncReplicas; + this.offlineReplicas = offlineReplicas; } /** @@ -71,14 +78,22 @@ public class PartitionInfo { return inSyncReplicas; } + /** + * The subset of the replicas that are offline + */ + public Node[] offlineReplicas() { + return offlineReplicas; + } + @Override public String toString() { - return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)", + return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)", topic, partition, leader == null ? "none" : leader.idString(), formatNodeIds(replicas), - formatNodeIds(inSyncReplicas)); + formatNodeIds(inSyncReplicas), + formatNodeIds(offlineReplicas)); } /* Extract the node ids from each item in the array and format for display */ http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java new file mode 100644 index 0000000..00c7cee --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/KafkaStorageException.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Miscellaneous disk-related IOException occurred when handling a request. + * Client should request metadata update and retry if the response shows KafkaStorageException + * + * Here are the guidelines on how to handle KafkaStorageException and IOException: + * + * 1) If the server has not finished loading logs, IOException does not need to be converted to KafkaStorageException + * 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddLogFailureEvent + * Then the IOException should either be swallowed and logged, or be converted and re-thrown as KafkaStorageException + * 3) It is preferred for IOException to be caught in Log rather than in ReplicaManager or LogSegment. + * + */ +public class KafkaStorageException extends InvalidMetadataException { + + private static final long serialVersionUID = 1L; + + public KafkaStorageException() { + super(); + } + + public KafkaStorageException(String message) { + super(message); + } + + public KafkaStorageException(Throwable cause) { + super(cause); + } + + public KafkaStorageException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ae8d161..19acfd6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.errors.InvalidTimestampException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.InvalidTxnTimeoutException; +import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotControllerException; @@ -84,10 +85,15 @@ import java.util.Map; * This class contains all the client-server errors--those errors that must be sent from the server to the client. These * are thus part of the protocol. The names can be changed but the error code cannot. * + * Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library + * version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added, + * we may need extra logic to convert the new error code to another existing error code before sending the response back to + * the client if the request version suggests that the client may not recognize the new error code. + * * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { - UNKNOWN(-1, "The server experienced an unexpected error when processing the request", + UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request", new ApiExceptionBuilder() { @Override public ApiException build(String message) { @@ -495,7 +501,14 @@ public enum Errors { public ApiException build(String message) { return new OperationNotAttemptedException(message); } - }); + }), + KAFKA_STORAGE_ERROR(56, "Disk error when trying to access log file on the disk.", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new KafkaStorageException(message); + } + }); private interface ApiExceptionBuilder { ApiException build(String message); @@ -588,7 +601,7 @@ public enum Errors { return error; } else { log.warn("Unexpected error code: {}.", code); - return UNKNOWN; + return UNKNOWN_SERVER_ERROR; } } @@ -604,7 +617,7 @@ public enum Errors { return error; clazz = clazz.getSuperclass(); } - return UNKNOWN; + return UNKNOWN_SERVER_ERROR; } private static String toHtml() { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 383332b..329d99b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -78,6 +78,9 @@ public class Protocol { "topics that don't exist will be created by the broker. " + "Otherwise, no topics will be created by the broker.")); + /* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */ + public static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4; + public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), new Field("port", INT32, @@ -121,12 +124,40 @@ public class Protocol { public static final Schema PARTITION_METADATA_V1 = PARTITION_METADATA_V0; + // PARTITION_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + public static final Schema PARTITION_METADATA_V2 = new Schema(new Field("partition_error_code", + INT16, + "The error code for the partition, if any."), + new Field("partition_id", + INT32, + "The id of the partition."), + new Field("leader", + INT32, + "The id of the broker acting as leader for this partition."), + new Field("replicas", + new ArrayOf(INT32), + "The set of all nodes that host this partition."), + new Field("isr", + new ArrayOf(INT32), + "The set of nodes that are in sync with the leader for this partition."), + new Field("offline_replicas", + new ArrayOf(INT32), + "The set of offline replicas of this partition.")); + public static final Schema TOPIC_METADATA_V1 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."), new Field("topic", STRING, "The name of the topic"), new Field("is_internal", BOOLEAN, "Indicates if the topic is considered a Kafka internal topic"), new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V1), - "Metadata for each partition of the topic.")); + "Metadata for each partition of the topic.")); + + // TOPIC_METADATA_V2 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + public static final Schema TOPIC_METADATA_V2 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."), + new Field("topic", STRING, "The name of the topic"), + new Field("is_internal", BOOLEAN, + "Indicates if the topic is considered a Kafka internal topic"), + new Field("partition_metadata", new ArrayOf(PARTITION_METADATA_V2), + "Metadata for each partition of the topic.")); public static final Schema METADATA_RESPONSE_V1 = new Schema(new Field("brokers", new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), @@ -154,8 +185,19 @@ public class Protocol { public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3; - public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4}; - public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4}; + // METADATA_RESPONSE_V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + public static final Schema METADATA_RESPONSE_V5 = new Schema( + newThrottleTimeField(), + new Field("brokers", new ArrayOf(METADATA_BROKER_V1), + "Host and port information for all brokers."), + new Field("cluster_id", NULLABLE_STRING, + "The cluster id that this broker belongs to."), + new Field("controller_id", INT32, + "The broker id of the controller broker."), + new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V2))); + + public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4, METADATA_REQUEST_V5}; + public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4, METADATA_RESPONSE_V5}; /* Produce api */ @@ -205,6 +247,13 @@ public class Protocol { new Field("timeout", INT32, "The time to await a response in ms."), new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0))); + /** + * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3 + */ + public static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3; + public static final Schema PRODUCE_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(new Schema(new Field("topic", STRING), new Field("partition_responses", @@ -236,10 +285,18 @@ public class Protocol { "If LogAppendTime is used for the topic, the timestamp will be " + "the broker local time when the messages are appended."))))))), newThrottleTimeField()); + public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2; - public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; - public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3}; + /** + * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3 + */ + public static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3; + + public static final Schema[] PRODUCE_REQUEST = {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, PRODUCE_REQUEST_V4}; + public static final Schema[] PRODUCE_RESPONSE = {PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, PRODUCE_RESPONSE_V4}; /* Offset commit api */ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -666,6 +723,13 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V5), "Topics to fetch in the order provided.")); + /** + * The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + */ + public static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5; + public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -692,7 +756,6 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1; public static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2; - // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the // last stable offset). It also exposes messages with magic v2 (along with older formats). private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema( @@ -759,8 +822,15 @@ public class Protocol { newThrottleTimeField(), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5))); - public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5}; - public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5}; + /** + * The body of FETCH_RESPONSE_V6 is the same as FETCH_RESPONSE_V5. + * The version number is bumped up to indicate that the client supports KafkaStorageException. + * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5 + */ + public static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5; + + public static final Schema[] FETCH_REQUEST = {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6}; + public static final Schema[] FETCH_RESPONSE = {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6}; /* List groups api */ public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); @@ -1039,6 +1109,19 @@ public class Protocol { new Field("zk_version", INT32, "The ZK version."), new Field("replicas", new ArrayOf(INT32), "The replica ids.")); + // LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 added a per-partition is_new field. + // This field specifies whether the replica should have existed on the broker or not. + public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1 = + new Schema(new Field("topic", STRING, "Topic name."), + new Field("partition", INT32, "Topic partition id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("leader", INT32, "The broker id for the leader."), + new Field("leader_epoch", INT32, "The leader epoch."), + new Field("isr", new ArrayOf(INT32), "The in sync replica ids."), + new Field("zk_version", INT32, "The ZK version."), + new Field("replicas", new ArrayOf(INT32), "The replica ids."), + new Field("is_new", BOOLEAN, "Whether the replica should have existed on the broker or not")); + public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 = new Schema(new Field("id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), @@ -1050,6 +1133,13 @@ public class Protocol { new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)), new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); + // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new field. This field specifies whether the replica should have existed on the broker or not. + public static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", + new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V1)), + new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); + public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."), new Field("partition", INT32, "Topic partition id."), new Field("error_code", INT16, "Error code.")); @@ -1058,8 +1148,11 @@ public class Protocol { new Field("partitions", new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0))); - public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0}; - public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0}; + // LeaderAndIsrResponse V1 may receive KAFKA_STORAGE_ERROR in the response + public static final Schema LEADER_AND_ISR_RESPONSE_V1 = LEADER_AND_ISR_RESPONSE_V0; + + public static final Schema[] LEADER_AND_ISR_REQUEST = {LEADER_AND_ISR_REQUEST_V0, LEADER_AND_ISR_REQUEST_V1}; + public static final Schema[] LEADER_AND_ISR_RESPONSE = {LEADER_AND_ISR_RESPONSE_V0, LEADER_AND_ISR_RESPONSE_V1}; /* Replica api */ public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."), @@ -1141,6 +1234,18 @@ public class Protocol { public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2; + // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 = + new Schema(new Field("topic", STRING, "Topic name."), + new Field("partition", INT32, "Topic partition id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("leader", INT32, "The broker id for the leader."), + new Field("leader_epoch", INT32, "The leader epoch."), + new Field("isr", new ArrayOf(INT32), "The in sync replica ids."), + new Field("zk_version", INT32, "The ZK version."), + new Field("replicas", new ArrayOf(INT32), "The replica ids."), + new Field("offline_replicas", new ArrayOf(INT32), "The offline replica ids")); + public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 = new Schema(new Field("port", INT32, "The port on which the broker accepts requests."), new Field("host", STRING, "The hostname of the broker."), @@ -1158,12 +1263,21 @@ public class Protocol { new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)), new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3))); + // UPDATE_METADATA_REQUEST_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + public static final Schema UPDATE_METADATA_REQUEST_V4 = + new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V4)), + new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3))); + public static final Schema UPDATE_METADATA_RESPONSE_V3 = UPDATE_METADATA_RESPONSE_V2; + public static final Schema UPDATE_METADATA_RESPONSE_V4 = UPDATE_METADATA_RESPONSE_V3; + public static final Schema[] UPDATE_METADATA_REQUEST = {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, - UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3}; + UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3, UPDATE_METADATA_REQUEST_V4}; public static final Schema[] UPDATE_METADATA_RESPONSE = {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, - UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3}; + UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3, UPDATE_METADATA_RESPONSE_V4}; /* SASL handshake api */ public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema( http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 35431d8..a898634 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -172,6 +172,13 @@ public class FileRecords extends AbstractRecords implements Closeable { } /** + * Close file handlers used by the FileChannel but don't write to disk. This is used when the disk may have failed + */ + public void closeHandlers() throws IOException { + channel.close(); + } + + /** * Delete this message set from the filesystem * @return True iff this message set was deleted. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 824a76f..f0f516c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -203,7 +203,7 @@ public class FetchResponse extends AbstractResponse { long logStartOffset = INVALID_LOG_START_OFFSET; if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME)) logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME); - + Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME); List<AbortedTransaction> abortedTransactions = null; @@ -326,10 +326,17 @@ public class FetchResponse extends AbstractResponse { List<Struct> partitionArray = new ArrayList<>(); for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); + short errorCode = fetchPartitionData.error.code(); + // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code + // for KafkaStorageException. In this case the client library will translate KafkaStorageException to + // UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry + // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5 + if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 5) + errorCode = Errors.NOT_LEADER_FOR_PARTITION.code(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME); partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code()); + partitionDataHeader.set(ERROR_CODE_KEY_NAME, errorCode); partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark); if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 1fdb4a2..733c9af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -45,6 +45,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { private static final String ISR_KEY_NAME = "isr"; private static final String ZK_VERSION_KEY_NAME = "zk_version"; private static final String REPLICAS_KEY_NAME = "replicas"; + private static final String IS_NEW_KEY_NAME = "is_new"; // live_leaders key names private static final String END_POINT_ID_KEY_NAME = "id"; @@ -57,9 +58,9 @@ public class LeaderAndIsrRequest extends AbstractRequest { private final Map<TopicPartition, PartitionState> partitionStates; private final Set<Node> liveLeaders; - public Builder(int controllerId, int controllerEpoch, + public Builder(short version, int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates, Set<Node> liveLeaders) { - super(ApiKeys.LEADER_AND_ISR); + super(ApiKeys.LEADER_AND_ISR, version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.partitionStates = partitionStates; @@ -121,10 +122,10 @@ public class LeaderAndIsrRequest extends AbstractRequest { List<Integer> replicas = new ArrayList<>(replicasArray.length); for (Object r : replicasArray) replicas.add((Integer) r); + boolean isNew = partitionStateData.hasField(IS_NEW_KEY_NAME) ? partitionStateData.getBoolean(IS_NEW_KEY_NAME) : false; - PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); + PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, isNew); partitionStates.put(new TopicPartition(topic, partition), partitionState); - } Set<Node> leaders = new HashSet<>(); @@ -162,6 +163,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + if (partitionStateData.hasField(IS_NEW_KEY_NAME)) + partitionStateData.set(IS_NEW_KEY_NAME, partitionState.isNew); partitionStatesData.add(partitionStateData); } struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); @@ -188,6 +191,7 @@ public class LeaderAndIsrRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: + case 1: return new LeaderAndIsrResponse(Errors.NONE, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 0493f3d..8aa2fc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -135,6 +135,7 @@ public class MetadataRequest extends AbstractRequest { return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); case 3: case 4: + case 5: return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index b798764..10f5c13 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -80,6 +80,7 @@ public class MetadataResponse extends AbstractResponse { private static final String LEADER_KEY_NAME = "leader"; private static final String REPLICAS_KEY_NAME = "replicas"; private static final String ISR_KEY_NAME = "isr"; + private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas"; private final int throttleTimeMs; private final Collection<Node> brokers; @@ -149,26 +150,18 @@ public class MetadataResponse extends AbstractResponse { int partition = partitionInfo.getInt(PARTITION_KEY_NAME); int leader = partitionInfo.getInt(LEADER_KEY_NAME); Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); - List<Node> replicaNodes = new ArrayList<>(replicas.length); - for (Object replicaNodeId : replicas) { - if (brokers.containsKey(replicaNodeId)) - replicaNodes.add(brokers.get(replicaNodeId)); - else - replicaNodes.add(new Node((int) replicaNodeId, "", -1)); - } + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + List<Node> replicaNodes = convertToNodes(brokers, replicas); Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); - List<Node> isrNodes = new ArrayList<>(isr.length); - for (Object isrNode : isr) { - if (brokers.containsKey(isrNode)) - isrNodes.add(brokers.get(isrNode)); - else - isrNodes.add(new Node((int) isrNode, "", -1)); - } + List<Node> isrNodes = convertToNodes(brokers, isr); + + Object[] offlineReplicas = partitionInfo.hasField(OFFLINE_REPLICAS_KEY_NAME) ? + (Object[]) partitionInfo.get(OFFLINE_REPLICAS_KEY_NAME) : new Object[0]; + List<Node> offlineNodes = convertToNodes(brokers, offlineReplicas); - partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes)); + partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, replicaNodes, isrNodes, offlineNodes)); } topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata)); @@ -179,6 +172,16 @@ public class MetadataResponse extends AbstractResponse { this.topicMetadata = topicMetadata; } + private List<Node> convertToNodes(Map<Integer, Node> brokers, Object[] brokerIds) { + List<Node> nodes = new ArrayList<>(brokerIds.length); + for (Object brokerId : brokerIds) + if (brokers.containsKey(brokerId)) + nodes.add(brokers.get(brokerId)); + else + nodes.add(new Node((int) brokerId, "", -1)); + return nodes; + } + private Node getControllerNode(int controllerId, Collection<Node> brokers) { for (Node broker : brokers) { if (broker.id() == controllerId) @@ -256,7 +259,8 @@ public class MetadataResponse extends AbstractResponse { partitionMetadata.partition, partitionMetadata.leader, partitionMetadata.replicas.toArray(new Node[0]), - partitionMetadata.isr.toArray(new Node[0]))); + partitionMetadata.isr.toArray(new Node[0]), + partitionMetadata.offlineReplicas.toArray(new Node[0]))); } } @@ -334,23 +338,27 @@ public class MetadataResponse extends AbstractResponse { } + // This is used to describe per-partition state in the MetadataResponse public static class PartitionMetadata { private final Errors error; private final int partition; private final Node leader; private final List<Node> replicas; private final List<Node> isr; + private final List<Node> offlineReplicas; public PartitionMetadata(Errors error, int partition, Node leader, List<Node> replicas, - List<Node> isr) { + List<Node> isr, + List<Node> offlineReplicas) { this.error = error; this.partition = partition; this.leader = leader; this.replicas = replicas; this.isr = isr; + this.offlineReplicas = offlineReplicas; } public Errors error() { @@ -373,6 +381,10 @@ public class MetadataResponse extends AbstractResponse { return isr; } + public List<Node> offlineReplicas() { + return offlineReplicas; + } + @Override public String toString() { return "(type=PartitionMetadata," + @@ -380,7 +392,8 @@ public class MetadataResponse extends AbstractResponse { ", partition=" + partition + ", leader=" + leader + ", replicas=" + Utils.join(replicas, ",") + - ", isr=" + Utils.join(isr, ",") + ')'; + ", isr=" + Utils.join(isr, ",") + + ", offlineReplicas=" + Utils.join(offlineReplicas, ",") + ')'; } } @@ -433,6 +446,12 @@ public class MetadataResponse extends AbstractResponse { for (Node node : partitionMetadata.isr) isr.add(node.id()); partitionData.set(ISR_KEY_NAME, isr.toArray()); + if (partitionData.hasField(OFFLINE_REPLICAS_KEY_NAME)) { + ArrayList<Integer> offlineReplicas = new ArrayList<>(partitionMetadata.offlineReplicas.size()); + for (Node node : partitionMetadata.offlineReplicas) + offlineReplicas.add(node.id()); + partitionData.set(OFFLINE_REPLICAS_KEY_NAME, offlineReplicas.toArray()); + } partitionMetadataArray.add(partitionData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java index 394a60f..be303d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Utils; import java.util.List; +// This is used to describe per-partition state in the LeaderAndIsrRequest public class PartitionState { public final int controllerEpoch; public final int leader; @@ -27,14 +28,16 @@ public class PartitionState { public final List<Integer> isr; public final int zkVersion; public final List<Integer> replicas; + public final boolean isNew; - public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas) { + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas, boolean isNew) { this.controllerEpoch = controllerEpoch; this.leader = leader; this.leaderEpoch = leaderEpoch; this.isr = isr; this.zkVersion = zkVersion; this.replicas = replicas; + this.isNew = isNew; } @Override @@ -44,6 +47,7 @@ public class PartitionState { ", leaderEpoch=" + leaderEpoch + ", isr=" + Utils.join(isr, ",") + ", zkVersion=" + zkVersion + - ", replicas=" + Utils.join(replicas, ",") + ")"; + ", replicas=" + Utils.join(replicas, ",") + + ", isNew=" + isNew + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 3d696c1..089d199 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -246,6 +246,7 @@ public class ProduceRequest extends AbstractRequest { case 1: case 2: case 3: + case 4: return new ProduceResponse(responseMap, throttleTimeMs); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -307,6 +308,7 @@ public class ProduceRequest extends AbstractRequest { return RecordBatch.MAGIC_VALUE_V1; case 3: + case 4: return RecordBatch.MAGIC_VALUE_V2; default: http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index d42f1c6..fdfba8b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -122,9 +122,16 @@ public class ProduceResponse extends AbstractResponse { List<Struct> partitionArray = new ArrayList<>(); for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) { PartitionResponse part = partitionEntry.getValue(); + short errorCode = part.error.code(); + // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code + // for KafkaStorageException. In this case the client library will translate KafkaStorageException to + // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry + // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if ProduceRequest version <= 3 + if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3) + errorCode = Errors.NOT_LEADER_FOR_PARTITION.code(); Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME) .set(PARTITION_KEY_NAME, partitionEntry.getKey()) - .set(ERROR_CODE_KEY_NAME, part.error.code()) + .set(ERROR_CODE_KEY_NAME, errorCode) .set(BASE_OFFSET_KEY_NAME, part.baseOffset); if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 8f9b592..1e20866 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -74,6 +75,43 @@ public class UpdateMetadataRequest extends AbstractRequest { } } + public static final class PartitionState { + public final int controllerEpoch; + public final int leader; + public final int leaderEpoch; + public final List<Integer> isr; + public final int zkVersion; + public final List<Integer> replicas; + public final List<Integer> offlineReplicas; + + public PartitionState(int controllerEpoch, + int leader, + int leaderEpoch, + List<Integer> isr, + int zkVersion, + List<Integer> replicas, + List<Integer> offlineReplicas) { + this.controllerEpoch = controllerEpoch; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.isr = isr; + this.zkVersion = zkVersion; + this.replicas = replicas; + this.offlineReplicas = offlineReplicas; + } + + @Override + public String toString() { + return "PartitionState(controllerEpoch=" + controllerEpoch + + ", leader=" + leader + + ", leaderEpoch=" + leaderEpoch + + ", isr=" + Arrays.toString(isr.toArray()) + + ", zkVersion=" + zkVersion + + ", replicas=" + Arrays.toString(replicas.toArray()) + + ", offlineReplicas=" + Arrays.toString(replicas.toArray()) + ")"; + } + } + public static final class Broker { public final int id; public final List<EndPoint> endPoints; @@ -129,6 +167,7 @@ public class UpdateMetadataRequest extends AbstractRequest { private static final String ISR_KEY_NAME = "isr"; private static final String ZK_VERSION_KEY_NAME = "zk_version"; private static final String REPLICAS_KEY_NAME = "replicas"; + private static final String OFFLINE_REPLICAS_KEY_NAME = "offline_replicas"; // Broker key names private static final String BROKER_ID_KEY_NAME = "id"; @@ -146,8 +185,8 @@ public class UpdateMetadataRequest extends AbstractRequest { private final Map<TopicPartition, PartitionState> partitionStates; private final Set<Broker> liveBrokers; - private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, Map<TopicPartition, - PartitionState> partitionStates, Set<Broker> liveBrokers) { + private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, + Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) { super(version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; @@ -178,9 +217,16 @@ public class UpdateMetadataRequest extends AbstractRequest { for (Object r : replicasArray) replicas.add((Integer) r); - PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); - partitionStates.put(new TopicPartition(topic, partition), partitionState); + List<Integer> offlineReplicas = new ArrayList<>(); + if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME)) { + Object[] offlineReplicasArray = partitionStateData.getArray(OFFLINE_REPLICAS_KEY_NAME); + for (Object r : offlineReplicasArray) + offlineReplicas.add((Integer) r); + } + PartitionState partitionState = + new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, offlineReplicas); + partitionStates.put(new TopicPartition(topic, partition), partitionState); } Set<Broker> liveBrokers = new HashSet<>(); @@ -245,6 +291,8 @@ public class UpdateMetadataRequest extends AbstractRequest { partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME)) + partitionStateData.set(OFFLINE_REPLICAS_KEY_NAME, partitionState.offlineReplicas.toArray()); partitionStatesData.add(partitionStateData); } struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); @@ -286,7 +334,7 @@ public class UpdateMetadataRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { short versionId = version(); - if (versionId <= 3) + if (versionId <= 4) return new UpdateMetadataResponse(Errors.forException(e)); else throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index afebd9d..dd1c79a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -124,7 +124,7 @@ public class AbstractCoordinatorTest { throw e; return false; } - }, heartbeatResponse(Errors.UNKNOWN)); + }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR)); try { coordinator.ensureActiveGroup(); @@ -499,7 +499,7 @@ public class AbstractCoordinatorTest { heartbeatReceived.set(true); return isHeartbeatRequest; } - }, heartbeatResponse(Errors.UNKNOWN)); + }, heartbeatResponse(Errors.UNKNOWN_SERVER_ERROR)); return heartbeatReceived; } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index fc0ddff..3c1b411 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -621,7 +621,7 @@ public class ConsumerCoordinatorTest { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN)); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR)); coordinator.joinGroupIfNeeded(); } @@ -1267,7 +1267,7 @@ public class ConsumerCoordinatorTest { coordinator.ensureCoordinatorReady(); // sync commit with invalid partitions should throw if we have no callback - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN)), false); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_SERVER_ERROR)), false); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index c0edcfd..bb3162a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1979,7 +1979,8 @@ public class FetcherTest { partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), - Arrays.asList(partitionInfo.inSyncReplicas()))); + Arrays.asList(partitionInfo.inSyncReplicas()), + Arrays.asList(partitionInfo.offlineReplicas()))); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index b655743..9bac895 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -1758,7 +1758,7 @@ public class TransactionManagerTest { TransactionalRequestResult abortResult = transactionManager.beginAbort(); - prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch); + prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, consumerGroupId, pid, epoch); sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest assertFalse(abortResult.isCompleted()); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java index 7836023..7511d64 100644 --- a/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java +++ b/clients/src/test/java/org/apache/kafka/common/PartitionInfoTest.java @@ -20,20 +20,21 @@ import org.junit.Assert; import org.junit.Test; public class PartitionInfoTest { - + @Test public void testToString() { String topic = "sample"; int partition = 0; Node leader = new Node(0, "localhost", 9092); Node r1 = new Node(1, "localhost", 9093); - Node r2 = new Node(2, "localhost", 9094); + Node r2 = new Node(2, "localhost", 9094); Node[] replicas = new Node[] {leader, r1, r2}; - Node[] inSyncReplicas = new Node[] {leader, r1, r2}; - PartitionInfo partitionInfo = new PartitionInfo(topic, partition, leader, replicas, inSyncReplicas); - - String expected = String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)", - topic, partition, leader.idString(), "[0,1,2]", "[0,1,2]"); + Node[] inSyncReplicas = new Node[] {leader, r1}; + Node[] offlineReplicas = new Node[] {r2}; + PartitionInfo partitionInfo = new PartitionInfo(topic, partition, leader, replicas, inSyncReplicas, offlineReplicas); + + String expected = String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)", + topic, partition, leader.idString(), "[0,1,2]", "[0,1]", "[2]"); Assert.assertEquals(expected, partitionInfo.toString()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java index 4a96ac4..e424384 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ErrorsTest.java @@ -74,12 +74,12 @@ public class ErrorsTest { @Test public void testForExceptionDefault() { Errors error = Errors.forException(new ApiException()); - assertEquals("forException should default to unknown", Errors.UNKNOWN, error); + assertEquals("forException should default to unknown", Errors.UNKNOWN_SERVER_ERROR, error); } @Test public void testExceptionName() { - String exceptionName = Errors.UNKNOWN.exceptionName(); + String exceptionName = Errors.UNKNOWN_SERVER_ERROR.exceptionName(); assertEquals("org.apache.kafka.common.errors.UnknownServerException", exceptionName); exceptionName = Errors.NONE.exceptionName(); assertNull(exceptionName); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index a3c277f..a1c2a83 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -723,10 +723,11 @@ public class RequestResponseTest { Node node = new Node(1, "host1", 1001); List<Node> replicas = asList(node); List<Node> isr = asList(node); + List<Node> offlineReplicas = asList(); List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>(); allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true, - asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr)))); + asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas)))); allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false, Collections.<MetadataResponse.PartitionMetadata>emptyList())); @@ -807,18 +808,17 @@ public class RequestResponseTest { List<Integer> isr = asList(1, 2); List<Integer> replicas = asList(1, 2, 3, 4); partitionStates.put(new TopicPartition("topic5", 105), - new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas)); + new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false)); partitionStates.put(new TopicPartition("topic5", 1), - new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas)); + new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false)); partitionStates.put(new TopicPartition("topic20", 1), - new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas)); + new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false)); Set<Node> leaders = Utils.mkSet( new Node(0, "test0", 1223), new Node(1, "test1", 1223) ); - - return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build(); + return new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), 1, 10, partitionStates, leaders).build(); } private LeaderAndIsrResponse createLeaderAndIsrResponse() { @@ -828,15 +828,16 @@ public class RequestResponseTest { } private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) { - Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); + Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>(); List<Integer> isr = asList(1, 2); List<Integer> replicas = asList(1, 2, 3, 4); + List<Integer> offlineReplicas = asList(); partitionStates.put(new TopicPartition("topic5", 105), - new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas)); + new UpdateMetadataRequest.PartitionState(0, 2, 1, isr, 2, replicas, offlineReplicas)); partitionStates.put(new TopicPartition("topic5", 1), - new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas)); + new UpdateMetadataRequest.PartitionState(1, 1, 1, isr, 2, replicas, offlineReplicas)); partitionStates.put(new TopicPartition("topic20", 1), - new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas)); + new UpdateMetadataRequest.PartitionState(1, 0, 1, isr, 2, replicas, offlineReplicas)); SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT; List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 7bd626a..2baed02 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -66,7 +66,7 @@ class AdminClient(val time: Time, } finally { pendingFutures.asScala.foreach { future => try { - future.raise(Errors.UNKNOWN) + future.raise(Errors.UNKNOWN_SERVER_ERROR) } catch { case _: IllegalStateException => // It is OK if the future has been completed } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 923ceb7..7de85e4 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -408,7 +408,7 @@ object AdminUtils extends Logging with AdminUtilities { zkUtils.pathExists(getTopicPath(topic)) def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced, - brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = { + brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = { val allBrokers = zkUtils.getAllBrokersInCluster() val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers) val brokersWithRack = brokers.filter(_.rack.nonEmpty) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 62d5d12..05658dd 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -69,7 +69,10 @@ object ApiVersion { "0.11.0-IV1" -> KAFKA_0_11_0_IV1, // Introduced leader epoch fetches to the replica fetcher via KIP-101 "0.11.0-IV2" -> KAFKA_0_11_0_IV2, - "0.11.0" -> KAFKA_0_11_0_IV2 + "0.11.0" -> KAFKA_0_11_0_IV2, + // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 + "0.11.1-IV0" -> KAFKA_0_11_1_IV0, + "0.11.1" -> KAFKA_0_11_1_IV0 ) private val versionPattern = "\\.".r @@ -171,3 +174,10 @@ case object KAFKA_0_11_0_IV2 extends ApiVersion { val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2 val id: Int = 12 } + +case object KAFKA_0_11_1_IV0 extends ApiVersion { + val version: String = "0.11.1-IV0" + val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2 + val id: Int = 13 +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/api/LeaderAndIsr.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index 474d7a0..e92dc33 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -50,13 +50,26 @@ case class LeaderAndIsr(leader: Int, } } -case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int]) { +case class LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], isNew: Boolean) { override def toString: String = { val partitionStateInfo = new StringBuilder partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString) partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")") partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")") + partitionStateInfo.append(",isNew:" + isNew + ")") + partitionStateInfo.toString() + } +} + +case class MetadataPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], offlineReplicas: Seq[Int]) { + + override def toString: String = { + val partitionStateInfo = new StringBuilder + partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString) + partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")") + partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")") + partitionStateInfo.append(",OfflineReplicas:" + offlineReplicas.mkString(",") + ")") partitionStateInfo.toString() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ebf3140..2c4767b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,9 +16,8 @@ */ package kafka.cluster -import java.io.IOException -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr @@ -46,12 +45,15 @@ import scala.collection.JavaConverters._ class Partition(val topic: String, val partitionId: Int, time: Time, - replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { + replicaManager: ReplicaManager, + val isOffline: Boolean = false) extends Logging with KafkaMetricsGroup { + val topicPartition = new TopicPartition(topic, partitionId) - private val localBrokerId = replicaManager.config.brokerId - private val logManager = replicaManager.logManager - private val zkUtils = replicaManager.zkUtils + // Do not use replicaManager if this partition is ReplicaManager.OfflinePartition + private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1 + private val logManager = if (!isOffline) replicaManager.logManager else null + private val zkUtils = if (!isOffline) replicaManager.zkUtils else null private val assignedReplicaMap = new Pool[Int, Replica] // The read lock is only required when multiple reads are executed and needs to be in a consistent manner private val leaderIsrUpdateLock = new ReentrantReadWriteLock @@ -71,56 +73,59 @@ class Partition(val topic: String, private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId val tags = Map("topic" -> topic, "partition" -> partitionId.toString) - newGauge("UnderReplicated", - new Gauge[Int] { - def value = { - if (isUnderReplicated) 1 else 0 - } - }, - tags - ) - - newGauge("InSyncReplicasCount", - new Gauge[Int] { - def value = { - if (isLeaderReplicaLocal) inSyncReplicas.size else 0 - } - }, - tags - ) - - newGauge("ReplicasCount", - new Gauge[Int] { - def value = { - if (isLeaderReplicaLocal) assignedReplicas.size else 0 - } - }, - tags - ) - - newGauge("LastStableOffsetLag", - new Gauge[Long] { - def value = { - leaderReplicaIfLocal.map { replica => - replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset - }.getOrElse(0) - } - }, - tags - ) + // Do not create metrics if this partition is ReplicaManager.OfflinePartition + if (!isOffline) { + newGauge("UnderReplicated", + new Gauge[Int] { + def value = { + if (isUnderReplicated) 1 else 0 + } + }, + tags + ) + + newGauge("InSyncReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) inSyncReplicas.size else 0 + } + }, + tags + ) + + newGauge("ReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) assignedReplicas.size else 0 + } + }, + tags + ) + + newGauge("LastStableOffsetLag", + new Gauge[Long] { + def value = { + leaderReplicaIfLocal.map { replica => + replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset + }.getOrElse(0) + } + }, + tags + ) + } private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined def isUnderReplicated: Boolean = isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size - def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { + def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = { assignedReplicaMap.getAndMaybePut(replicaId, { if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)) - val log = logManager.createLog(topicPartition, config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) + val log = logManager.getOrCreateLog(topicPartition, config, isNew) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offsetMap = checkpoint.read if (!offsetMap.contains(topicPartition)) info(s"No checkpointed highwatermark is found for partition $topicPartition") @@ -151,14 +156,8 @@ class Partition(val topic: String, assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None - try { - logManager.asyncDelete(topicPartition) - removePartitionMetrics() - } catch { - case e: IOException => - fatal(s"Error deleting the log for partition $topicPartition", e) - Exit.halt(1) - } + removePartitionMetrics() + logManager.asyncDelete(topicPartition) } } @@ -176,7 +175,7 @@ class Partition(val topic: String, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch // add replicas that are new - val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet + val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) inSyncReplicas = newInSyncReplicas @@ -185,7 +184,7 @@ class Partition(val topic: String, //We cache the leader epoch here, persisting it only if it's local (hence having a log dir) leaderEpoch = partitionStateInfo.leaderEpoch - allReplicas.foreach(id => getOrCreateReplica(id)) + allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) zkVersion = partitionStateInfo.zkVersion val isNewLeader = @@ -230,7 +229,7 @@ class Partition(val topic: String, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.controllerEpoch // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) + allReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew)) // remove assigned replicas that have been removed by the controller (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) inSyncReplicas = Set.empty[Replica] @@ -557,7 +556,7 @@ class Partition(val topic: String, /** * remove deleted log metrics */ - private def removePartitionMetrics() { + def removePartitionMetrics() { removeMetric("UnderReplicated", tags) removeMetric("InSyncReplicasCount", tags) removeMetric("ReplicasCount", tags) @@ -565,12 +564,12 @@ class Partition(val topic: String, } override def equals(that: Any): Boolean = that match { - case other: Partition => partitionId == other.partitionId && topic == other.topic + case other: Partition => partitionId == other.partitionId && topic == other.topic && isOffline == other.isOffline case _ => false } override def hashCode: Int = - 31 + topic.hashCode + 17 * partitionId + 31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0) override def toString: String = { val partitionString = new StringBuilder http://git-wip-us.apache.org/repos/asf/kafka/blob/fc93fb4b/core/src/main/scala/kafka/common/KafkaStorageException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/KafkaStorageException.scala b/core/src/main/scala/kafka/common/KafkaStorageException.scala index 21dd583..e0ecff3 100644 --- a/core/src/main/scala/kafka/common/KafkaStorageException.scala +++ b/core/src/main/scala/kafka/common/KafkaStorageException.scala @@ -17,8 +17,10 @@ package kafka.common /** - * Kafka exception caused by real IOs. -*/ + * Kafka exception caused by disk-related IOException + * This class is deprecated and will be replaced by org.apache.kafka.common.errors.KafkaStorageException + */ +@Deprecated class KafkaStorageException(message: String, t: Throwable) extends RuntimeException(message, t) { def this(message: String) = this(message, null) def this(t: Throwable) = this("", t)
