Repository: kafka Updated Branches: refs/heads/trunk 69d2a1771 -> 94692288b
KAFKA-5793; Tighten up the semantics of the OutOfOrderSequenceException Description of the solution can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Exactly+Once+-+Solving+the+problem+of+spurious+OutOfOrderSequence+errors Author: Apurva Mehta <apu...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3865 from apurvam/KAFKA-5793-tighten-up-out-of-order-sequence-v2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94692288 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94692288 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94692288 Branch: refs/heads/trunk Commit: 94692288bedc353383e014914937a4798a1d4caa Parents: 69d2a17 Author: Apurva Mehta <apu...@confluent.io> Authored: Wed Sep 20 20:31:33 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Sep 20 20:31:33 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/RecordAccumulator.java | 31 +- .../clients/producer/internals/Sender.java | 17 +- .../producer/internals/TransactionManager.java | 81 ++++- .../errors/UnknownProducerIdException.java | 32 ++ .../apache/kafka/common/protocol/Errors.java | 14 +- .../kafka/common/protocol/types/Field.java | 10 + .../kafka/common/protocol/types/Struct.java | 6 + .../kafka/common/requests/ProduceRequest.java | 14 +- .../kafka/common/requests/ProduceResponse.java | 38 ++- .../clients/producer/internals/SenderTest.java | 335 +++++++++++++++++-- .../internals/TransactionManagerTest.java | 2 +- .../common/requests/RequestResponseTest.java | 33 +- .../main/scala/kafka/cluster/Partition.scala | 6 + core/src/main/scala/kafka/log/Log.scala | 11 +- .../scala/kafka/log/ProducerStateManager.scala | 12 +- .../scala/kafka/server/ReplicaManager.scala | 12 +- .../group/GroupCoordinatorTest.scala | 6 +- .../group/GroupMetadataManagerTest.scala | 10 +- .../TransactionStateManagerTest.scala | 4 +- .../kafka/log/ProducerStateManagerTest.scala | 6 +- .../unit/kafka/server/EdgeCaseRequestTest.scala | 4 +- .../unit/kafka/server/ReplicaManagerTest.scala | 49 +++ 22 files changed, 651 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index eb162be..46cf6c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -347,14 +347,16 @@ public final class RecordAccumulator { return numSplitBatches; } - // The deque for the partition may have to be reordered in situations where leadership changes in between - // batch drains. Since the requests are on different connections, we no longer have any guarantees about ordering - // of the responses. Hence we will have to check if there is anything out of order and ensure the batch is queued - // in the correct sequence order. + // We will have to do extra work to ensure the queue is in order when requests are being retried and there are + // multiple requests in flight to that partition. If the first inflight request fails to append, then all the subsequent + // in flight requests will also fail because the sequence numbers will not be accepted. + // + // Further, once batches are being retried, we are reduced to a single in flight request for that partition. So when + // the subsequent batches come back in sequence order, they will have to be placed further back in the queue. // // Note that this assumes that all the batches in the queue which have an assigned sequence also have the current - // producer id. We will not attempt to reorder messages if the producer id has changed. - + // producer id. We will not attempt to reorder messages if the producer id has changed, we will throw an + // IllegalStateException instead. private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) { // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence. if (batch.baseSequence() == RecordBatch.NO_SEQUENCE) @@ -365,19 +367,16 @@ public final class RecordAccumulator { throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " + "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence()); - // If there are no inflight batches being tracked by the transaction manager, it means that the producer - // id must have changed and the batches being re enqueued are from the old producer id. In this case - // we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence, - // or they will succeed. - if (batch.baseSequence() != transactionManager.nextBatchBySequence(batch.topicPartition).baseSequence()) { + ProducerBatch firstBatchInQueue = deque.peekFirst(); + if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { // The incoming batch can't be inserted at the front of the queue without violating the sequence ordering. // This means that the incoming batch should be placed somewhere further back. // We need to find the right place for the incoming batch and insert it there. - // We will only enter this branch if we have multiple inflights sent to different brokers, perhaps - // because a leadership change occurred in between the drains. In this scenario, responses can come - // back out of order, requiring us to re order the batches ourselves rather than relying on the - // implicit ordering guarantees of the network client which are only on a per connection basis. - + // We will only enter this branch if we have multiple inflights sent to different brokers and we need to retry + // the inflight batches. + // + // Since we reenqueue exactly one batch a time and ensure that the queue is ordered by sequence always, it + // is a simple linear scan of a subset of the in flight batches to find the right place in the queue each time. List<ProducerBatch> orderedBatches = new ArrayList<>(); while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence()) orderedBatches.add(deque.pollFirst()); http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ecfad70..02b79c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Measurable; @@ -511,7 +512,7 @@ public class Sender implements Runnable { this.accumulator.deallocate(batch); this.sensors.recordBatchSplit(); } else if (error != Errors.NONE) { - if (canRetry(batch, error)) { + if (canRetry(batch, response)) { log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, @@ -578,6 +579,7 @@ public class Sender implements Runnable { log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition, transactionManager.lastAckedSequence(batch.topicPartition)); } + transactionManager.updateLastAckedOffset(response, batch); transactionManager.removeInFlightBatch(batch); } @@ -591,12 +593,12 @@ public class Sender implements Runnable { private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) { if (transactionManager != null) { - if (exception instanceof OutOfOrderSequenceException + if ((exception instanceof OutOfOrderSequenceException || exception instanceof UnknownProducerIdException) && !transactionManager.isTransactional() && transactionManager.hasProducerId(batch.producerId())) { - log.error("The broker received an out of order sequence number for topic-partition " + + log.error("The broker returned {} for topic-partition " + "{} at offset {}. This indicates data loss on the broker, and should be investigated.", - batch.topicPartition, baseOffset); + exception, batch.topicPartition, baseOffset); // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees // about the previously committed message. Note that this will discard the producer id and sequence @@ -616,6 +618,7 @@ public class Sender implements Runnable { } this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); + batch.done(baseOffset, logAppendTime, exception); this.accumulator.deallocate(batch); } @@ -625,11 +628,9 @@ public class Sender implements Runnable { * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future * batches are certain to fail with an OutOfOrderSequence exception. */ - private boolean canRetry(ProducerBatch batch, Errors error) { + private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) { return batch.attempts() < this.retries && - ((error.exception() instanceof RetriableException) || - (error.exception() instanceof OutOfOrderSequenceException - && transactionManager.canRetryOutOfOrderSequenceException(batch))); + ((response.error.exception() instanceof RetriableException) || transactionManager.canRetry(response, batch)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index b2387a0..43abdbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; @@ -86,6 +87,10 @@ public class TransactionManager { // (either successfully or through a fatal failure). private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence; + // We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer + // responses which are due to the retention period elapsing, and those which are due to actual lost data. + private final Map<TopicPartition, Long> lastAckedOffset; + private final PriorityQueue<TxnRequestHandler> pendingRequests; private final Set<TopicPartition> newPartitionsInTransaction; private final Set<TopicPartition> pendingPartitionsInTransaction; @@ -182,6 +187,7 @@ public class TransactionManager { this.partitionsWithUnresolvedSequences = new HashSet<>(); this.inflightBatchesBySequence = new HashMap<>(); + this.lastAckedOffset = new HashMap<>(); this.retryBackoffMs = retryBackoffMs; } @@ -390,6 +396,7 @@ public class TransactionManager { this.lastAckedSequence.clear(); this.inflightBatchesBySequence.clear(); this.partitionsWithUnresolvedSequences.clear(); + this.lastAckedOffset.clear(); } /** @@ -454,6 +461,21 @@ public class TransactionManager { return currentLastAckedSequence; } + synchronized long lastAckedOffset(TopicPartition topicPartition) { + Long offset = lastAckedOffset.get(topicPartition); + if (offset == null) + return -1; + return offset; + } + + synchronized void updateLastAckedOffset(ProduceResponse.PartitionResponse response, ProducerBatch batch) { + if (response.baseOffset == ProduceResponse.INVALID_OFFSET) + return; + long lastOffset = response.baseOffset + batch.recordCount - 1; + if (lastOffset > lastAckedOffset(batch.topicPartition)) + lastAckedOffset.put(batch.topicPartition, lastOffset); + } + // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted // so that they don't fail with the OutOfOrderSequenceException. // @@ -486,6 +508,20 @@ public class TransactionManager { } } + private synchronized void startSequencesAtBeginning(TopicPartition topicPartition) { + int sequence = 0; + for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(topicPartition)) { + log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", + inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence); + inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), + inFlightBatch.producerEpoch()), sequence, inFlightBatch.isTransactional()); + + sequence += inFlightBatch.recordCount; + } + setNextSequence(topicPartition, sequence); + lastAckedSequence.remove(topicPartition); + } + synchronized boolean hasInflightBatches(TopicPartition topicPartition) { return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty(); } @@ -632,9 +668,48 @@ public class TransactionManager { return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError(); } - synchronized boolean canRetryOutOfOrderSequenceException(ProducerBatch batch) { - return hasProducerId(batch.producerId()) && !hasUnresolvedSequence(batch.topicPartition) && - (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence())); + synchronized boolean canRetry(ProduceResponse.PartitionResponse response, ProducerBatch batch) { + if (!hasProducerId(batch.producerId())) + return false; + + Errors error = response.error; + if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && !hasUnresolvedSequence(batch.topicPartition) && + (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) + // We should retry the OutOfOrderSequenceException if the batch is _not_ the next batch, ie. its base + // sequence isn't the lastAckedSequence + 1. However, if the first in flight batch fails fatally, we will + // adjust the sequences of the other inflight batches to account for the 'loss' of the sequence range in + // the batch which failed. In this case, an inflight batch will have a base sequence which is + // the lastAckedSequence + 1 after adjustment. When this batch fails with an OutOfOrderSequence, we want to retry it. + // To account for the latter case, we check whether the sequence has been reset since the last drain. + // If it has, we will retry it anyway. + return true; + + if (error == Errors.UNKNOWN_PRODUCER_ID) { + if (response.logStartOffset == -1) + // We don't know the log start offset with this response. We should just retry the request until we get it. + // The UNKNOWN_PRODUCER_ID error code was added along with the new ProduceResponse which includes the + // logStartOffset. So the '-1' sentinel is not for backward compatibility. Instead, it is possible for + // a broker to not know the logStartOffset at when it is returning the response because the partition + // may have moved away from the broker from the time the error was initially raised to the time the + // response was being constructed. In these cases, we should just retry the request: we are guaranteed + // to eventually get a logStartOffset once things settle down. + return true; + + if (batch.sequenceHasBeenReset()) { + // When the first inflight batch fails due to the truncation case, then the sequences of all the other + // in flight batches would have been restarted from the beginning. However, when those responses + // come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not + // reset the sequence numbers to the beginning. + return true; + } else if (lastAckedOffset(batch.topicPartition) < response.logStartOffset) { + // The head of the log has been removed, probably due to the retention time elapsing. In this case, + // we expect to lose the producer state. Reset the sequences of all inflight batches to be from the beginning + // and retry them. + startSequencesAtBeginning(batch.topicPartition); + return true; + } + } + return false; } // visible for testing http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java new file mode 100644 index 0000000..ce17345 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownProducerIdException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * This exception is raised by the broker if it could not locate the producer metadata associated with the producerId + * in question. This could happen if, for instance, the producer's records were deleted because their retention time + * had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, + * and future appends by the producer will return this exception. + */ +public class UnknownProducerIdException extends OutOfOrderSequenceException { + + public UnknownProducerIdException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 1039ca0..d9a95d4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; @@ -524,7 +525,18 @@ public enum Errors { public ApiException build(String message) { return new AuthenticationFailedException(message); } - }); + }), + UNKNOWN_PRODUCER_ID(59, "This exception is raised by the broker if it could not locate the producer metadata " + + "associated with the producerId in question. This could happen if, for instance, the producer's records " + + "were deleted because their retention time had elapsed. Once the last records of the producerId are " + + "removed, the producer's metadata is removed from the broker, and future appends by the producer will " + + "return this exception.", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new UnknownProducerIdException(message); + } + }); private interface ApiExceptionBuilder { ApiException build(String message); http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java index 8da848b..ec217f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java @@ -62,6 +62,16 @@ public class Field { } } + public static class Int64 extends Field { + public Int64(String name, String docString) { + super(name, Type.INT64, docString, false, null); + } + + public Int64(String name, String docString, long defaultValue) { + super(name, Type.INT64, docString, true, defaultValue); + } + } + public static class Int16 extends Field { public Int16(String name, String docString) { super(name, Type.INT16, docString, false, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index b3e9975..1cbbcb3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -95,6 +95,12 @@ public class Struct { return getString(field.name); } + public Long getOrElse(Field.Int64 field, long alternative) { + if (hasField(field.name)) + return getLong(field.name); + return alternative; + } + public Integer getOrElse(Field.Int32 field, int alternative) { if (hasField(field.name)) return getInt(field.name); http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index eac7661..8ab0b20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -102,9 +102,17 @@ public class ProduceRequest extends AbstractRequest { */ private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3; + /** + * The body of the PRODUCE_REQUEST_V5 is the same as PRODUCE_REQUEST_V4. + * The version number is bumped since the PRODUCE_RESPONSE_V5 includes an additional partition level + * field: the log_start_offset. + */ + private static final Schema PRODUCE_REQUEST_V5 = PRODUCE_REQUEST_V4; + + public static Schema[] schemaVersions() { return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3, - PRODUCE_REQUEST_V4}; + PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5}; } public static class Builder extends AbstractRequest.Builder<ProduceRequest> { @@ -119,7 +127,7 @@ public class ProduceRequest extends AbstractRequest { int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) { - super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? 3 : 2)); + super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2)); this.magic = magic; this.acks = acks; this.timeout = timeout; @@ -304,6 +312,7 @@ public class ProduceRequest extends AbstractRequest { case 2: case 3: case 4: + case 5: return new ProduceResponse(responseMap, throttleTimeMs); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -366,6 +375,7 @@ public class ProduceRequest extends AbstractRequest { case 3: case 4: + case 5: return RecordBatch.MAGIC_VALUE_V2; default: http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 4786307..e1978dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -71,6 +71,10 @@ public class ProduceResponse extends AbstractResponse { private static final String BASE_OFFSET_KEY_NAME = "base_offset"; private static final String LOG_APPEND_TIME_KEY_NAME = "log_append_time"; + private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset"; + + private static final Field.Int64 LOG_START_OFFSET_FIELD = new Field.Int64(LOG_START_OFFSET_KEY_NAME, + "The start offset of the log at the time this produce response was created", INVALID_OFFSET); private static final Schema PRODUCE_RESPONSE_V0 = new Schema( new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( @@ -116,9 +120,29 @@ public class ProduceResponse extends AbstractResponse { */ private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3; + + /** + * Add in the log_start_offset field to the partition response to filter out spurious OutOfOrderSequencExceptions + * on the client. + */ + public static final Schema PRODUCE_RESPONSE_V5 = new Schema( + new Field(RESPONSES_KEY_NAME, new ArrayOf(new Schema( + TOPIC_NAME, + new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( + PARTITION_ID, + ERROR_CODE, + new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + + "If LogAppendTime is used for the topic, the timestamp will be the broker local " + + "time when the messages are appended."), + LOG_START_OFFSET_FIELD)))))), + THROTTLE_TIME_MS); + + public static Schema[] schemaVersions() { return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3, - PRODUCE_RESPONSE_V4}; + PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5}; } private final Map<TopicPartition, PartitionResponse> responses; @@ -156,8 +180,9 @@ public class ProduceResponse extends AbstractResponse { Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME); + long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime)); + responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset)); } } this.throttleTime = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); @@ -188,6 +213,7 @@ public class ProduceResponse extends AbstractResponse { .set(BASE_OFFSET_KEY_NAME, part.baseOffset); if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); + partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset); partitionArray.add(partStruct); } topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); @@ -211,15 +237,17 @@ public class ProduceResponse extends AbstractResponse { public Errors error; public long baseOffset; public long logAppendTime; + public long logStartOffset; public PartitionResponse(Errors error) { - this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP); + this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, INVALID_OFFSET); } - public PartitionResponse(Errors error, long baseOffset, long logAppendTime) { + public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset) { this.error = error; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; + this.logStartOffset = logStartOffset; } @Override @@ -232,6 +260,8 @@ public class ProduceResponse extends AbstractResponse { b.append(baseOffset); b.append(",logAppendTime: "); b.append(logAppendTime); + b.append(", logStartOffset: "); + b.append(logStartOffset); b.append('}'); return b.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index a64bc56..0995e36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -203,7 +203,7 @@ public class SenderTest { // start off support produce request v3 apiVersions.update("0", NodeApiVersions.create()); - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP); + ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, RecordBatch.NO_TIMESTAMP, 100); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<>(); partResp.put(tp0, resp); partResp.put(tp1, resp); @@ -550,7 +550,8 @@ public class SenderTest { @Test - public void testIdempotenceWithMultipleInflightsFirstFails() throws Exception { + public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception { + // Send multiple in flight requests, retry them all one at a time, in the correct order. final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); setupWithTransactionState(transactionManager); @@ -571,51 +572,79 @@ public class SenderTest { // Send second ProduceRequest Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); - assertEquals(2, client.inFlightRequestCount()); - assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + + // Send third ProduceRequest + Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + assertEquals(3, client.inFlightRequestCount()); + assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(-1, transactionManager.lastAckedSequence(tp0)); assertFalse(request1.isDone()); assertFalse(request2.isDone()); + assertFalse(request3.isDone()); assertTrue(client.isReady(node, time.milliseconds())); sendIdempotentProducerResponse(0, tp0, Errors.LEADER_NOT_AVAILABLE, -1L); - sender.run(time.milliseconds()); // receive response 0 - assertEquals(1, client.inFlightRequestCount()); + // Queue the fourth request, it shouldn't be sent until the first 3 complete. + Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + + assertEquals(2, client.inFlightRequestCount()); assertEquals(-1, transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L); + sender.run(time.milliseconds()); // re send request 1, receive response 2 - sender.run(time.milliseconds()); // re send request 0, receive response 1 + sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L); + sender.run(time.milliseconds()); // receive response 3 assertEquals(-1, transactionManager.lastAckedSequence(tp0)); assertEquals(1, client.inFlightRequestCount()); sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries. + assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented. assertEquals(1, client.inFlightRequestCount()); assertEquals(-1, transactionManager.lastAckedSequence(tp0)); sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L); - sender.run(time.milliseconds()); // receive response 0 + sender.run(time.milliseconds()); // receive response 1 assertEquals(0, transactionManager.lastAckedSequence(tp0)); - assertEquals(0, client.inFlightRequestCount()); - - assertFalse(request2.isDone()); assertTrue(request1.isDone()); assertEquals(0, request1.get().offset()); - sender.run(time.milliseconds()); // send request 1 + + assertFalse(client.hasInFlightRequests()); + sender.run(time.milliseconds()); // send request 2; assertEquals(1, client.inFlightRequestCount()); - sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L); - sender.run(time.milliseconds()); // receive response 1 + sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L); + sender.run(time.milliseconds()); // receive response 2 + assertEquals(1, transactionManager.lastAckedSequence(tp0)); assertTrue(request2.isDone()); assertEquals(1, request2.get().offset()); + assertFalse(client.hasInFlightRequests()); - assertEquals(1, transactionManager.lastAckedSequence(tp0)); + + sender.run(time.milliseconds()); // send request 3 + assertEquals(1, client.inFlightRequestCount()); + + sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L); + sender.run(time.milliseconds()); // receive response 3, send request 4 since we are out of 'retry' mode. + assertEquals(2, transactionManager.lastAckedSequence(tp0)); + assertTrue(request3.isDone()); + assertEquals(2, request3.get().offset()); + + assertEquals(1, client.inFlightRequestCount()); + + sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L); + sender.run(time.milliseconds()); // receive response 4 + assertEquals(3, transactionManager.lastAckedSequence(tp0)); + assertTrue(request4.isDone()); + assertEquals(3, request4.get().offset()); } @Test @@ -1118,10 +1147,11 @@ public class SenderTest { ClientRequest firstClientRequest = client.requests().peek(); ClientRequest secondClientRequest = (ClientRequest) client.requests().toArray()[1]; - client.respondToRequest(secondClientRequest, produceResponse(tp0, 1, Errors.NONE, -1)); + client.respondToRequest(secondClientRequest, produceResponse(tp0, 1000, Errors.NONE, 0)); sender.run(time.milliseconds()); // receive response 1 + assertEquals(1000, transactionManager.lastAckedOffset(tp0)); assertEquals(1, transactionManager.lastAckedSequence(tp0)); client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.DUPLICATE_SEQUENCE_NUMBER, -1)); @@ -1130,10 +1160,267 @@ public class SenderTest { // Make sure that the last ack'd sequence doesn't change. assertEquals(1, transactionManager.lastAckedSequence(tp0)); + assertEquals(1000, transactionManager.lastAckedOffset(tp0)); + assertFalse(client.hasInFlightRequests()); + } + + @Test + public void testUnknownProducerHandlingWhenRetentionLimitReached() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + + // Send first ProduceRequest + Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); + + sender.run(time.milliseconds()); // receive the response. + + assertTrue(request1.isDone()); + assertEquals(1000L, request1.get().offset()); + assertEquals(0L, transactionManager.lastAckedSequence(tp0)); + assertEquals(1000L, transactionManager.lastAckedOffset(tp0)); + + // Send second ProduceRequest, a single batch with 2 records. + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + + assertFalse(request2.isDone()); + + sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L); + sender.run(time.milliseconds()); // receive response 0, should be retried since the logStartOffset > lastAckedOffset. + + // We should have reset the sequence number state of the partition because the state was lost on the broker. + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertFalse(request2.isDone()); + assertFalse(client.hasInFlightRequests()); + + sender.run(time.milliseconds()); // should retry request 1 + + // resend the request. Note that the expected sequence is 0, since we have lost producer state on the broker. + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L); + sender.run(time.milliseconds()); // receive response 1 + assertEquals(1, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertFalse(client.hasInFlightRequests()); + assertTrue(request2.isDone()); + assertEquals(1012L, request2.get().offset()); + assertEquals(1012L, transactionManager.lastAckedOffset(tp0)); + } + + @Test + public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + + // Send first ProduceRequest + Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); + + sender.run(time.milliseconds()); // receive the response. + + assertTrue(request1.isDone()); + assertEquals(1000L, request1.get().offset()); + assertEquals(0L, transactionManager.lastAckedSequence(tp0)); + assertEquals(1000L, transactionManager.lastAckedOffset(tp0)); + + // Send second ProduceRequest + Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + + assertFalse(request2.isDone()); + + sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L); + sender.run(time.milliseconds()); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown. + + // We should have reset the sequence number state of the partition because the state was lost on the broker. + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertFalse(request2.isDone()); + assertFalse(client.hasInFlightRequests()); + + sender.run(time.milliseconds()); // should retry request 1 + + // resend the request. Note that the expected sequence is 1, since we never got the logStartOffset in the previous + // response and hence we didn't reset the sequence numbers. + sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L); + sender.run(time.milliseconds()); // receive response 1 + assertEquals(1, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); assertFalse(client.hasInFlightRequests()); + assertTrue(request2.isDone()); + assertEquals(1011L, request2.get().offset()); + assertEquals(1011L, transactionManager.lastAckedOffset(tp0)); } - void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) { + @Test + public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + + // Send first ProduceRequest + Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); + + sender.run(time.milliseconds()); // receive the response. + + assertTrue(request1.isDone()); + assertEquals(1000L, request1.get().offset()); + assertEquals(0L, transactionManager.lastAckedSequence(tp0)); + assertEquals(1000L, transactionManager.lastAckedOffset(tp0)); + + // Send second ProduceRequest + Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + + // Send the third ProduceRequest, in parallel with the second. It should be retried even though the + // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back. + Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + + assertFalse(request2.isDone()); + assertFalse(request3.isDone()); + assertEquals(2, client.inFlightRequestCount()); + + + sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L); + sender.run(time.milliseconds()); // receive response 2, should reset the sequence numbers and be retried. + + // We should have reset the sequence number state of the partition because the state was lost on the broker. + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertFalse(request2.isDone()); + assertFalse(request3.isDone()); + assertEquals(1, client.inFlightRequestCount()); + + sender.run(time.milliseconds()); // resend request 2. + + assertEquals(2, client.inFlightRequestCount()); + + // receive the original response 3. note the expected sequence is still the originally assigned sequence. + sendIdempotentProducerResponse(2, tp0, Errors.UNKNOWN_PRODUCER_ID, -1, 1010L); + sender.run(time.milliseconds()); // receive response 3 + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L); + sender.run(time.milliseconds()); // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying + + assertTrue(request2.isDone()); + assertFalse(request3.isDone()); + assertFalse(client.hasInFlightRequests()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + assertEquals(1011L, request2.get().offset()); + assertEquals(1011L, transactionManager.lastAckedOffset(tp0)); + + sender.run(time.milliseconds()); // resend request 3. + assertEquals(1, client.inFlightRequestCount()); + + sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1012L, 1010L); + sender.run(time.milliseconds()); // receive response 3. + + assertFalse(client.hasInFlightRequests()); + assertTrue(request3.isDone()); + assertEquals(1012L, request3.get().offset()); + assertEquals(1012L, transactionManager.lastAckedOffset(tp0)); + } + + @Test + public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); + + // Send first ProduceRequest + Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(-1, transactionManager.lastAckedSequence(tp0)); + + sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L); + + sender.run(time.milliseconds()); // receive the response. + + assertTrue(request1.isDone()); + assertEquals(1000L, request1.get().offset()); + assertEquals(0L, transactionManager.lastAckedSequence(tp0)); + assertEquals(1000L, transactionManager.lastAckedOffset(tp0)); + + // Send second ProduceRequest, + Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); + assertEquals(0, transactionManager.lastAckedSequence(tp0)); + + assertFalse(request2.isDone()); + + sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L); + sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset + + assertTrue(request2.isDone()); + try { + request2.get(); + fail("Should have raised an OutOfOrderSequenceException"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof OutOfOrderSequenceException); + } + + } + void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) { + sendIdempotentProducerResponse(expectedSequence, tp, responseError, responseOffset, -1L); + } + + void sendIdempotentProducerResponse(final int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) { client.respond(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1148,7 +1435,7 @@ public class SenderTest { return true; } - }, produceResponse(tp, responseOffset, responseError, 0)); + }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset)); } @Test @@ -1432,7 +1719,7 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); - responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); + responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()), new ProduceResponse(responseMap)); @@ -1449,7 +1736,7 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); - responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); + responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L)); client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()), new ProduceResponse(responseMap)); @@ -1505,12 +1792,16 @@ public class SenderTest { } } - private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP); + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) { + ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { + return produceResponse(tp, offset, error, throttleTimeMs, -1L); + } + private void setupWithTransactionState(TransactionManager transactionManager) { Map<String, String> metricTags = new LinkedHashMap<>(); metricTags.put("client-id", CLIENT_ID); http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 28f9c82..8fc43df 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -2377,7 +2377,7 @@ public class TransactionManagerTest { } private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) { - ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP); + ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, 10); Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = singletonMap(tp, resp); return new ProduceResponse(partResp, throttleTimeMs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cc65003..e2e458d1 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -340,10 +340,39 @@ public class RequestResponseTest { } @Test + public void produceResponseV5Test() { + Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); + TopicPartition tp0 = new TopicPartition("test", 0); + responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, + 10000, RecordBatch.NO_TIMESTAMP, 100)); + + ProduceResponse v5Response = new ProduceResponse(responseData, 10); + short version = 5; + + ByteBuffer buffer = v5Response.serialize(version, new ResponseHeader(0)); + buffer.rewind(); + + ResponseHeader.parse(buffer); // throw away. + + Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer); + + ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, + deserializedStruct); + + assertEquals(1, v5FromBytes.responses().size()); + assertTrue(v5FromBytes.responses().containsKey(tp0)); + ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); + assertEquals(100, partitionResponse.logStartOffset); + assertEquals(10000, partitionResponse.baseOffset); + assertEquals(10, v5FromBytes.getThrottleTime()); + assertEquals(responseData, v5Response.responses()); + } + + @Test public void produceResponseVersionTest() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP)); + 10000, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v0Response = new ProduceResponse(responseData); ProduceResponse v1Response = new ProduceResponse(responseData, 10); ProduceResponse v2Response = new ProduceResponse(responseData, 10); @@ -744,7 +773,7 @@ public class RequestResponseTest { private ProduceResponse createProduceResponse() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP)); + 10000, RecordBatch.NO_TIMESTAMP, 100)); return new ProduceResponse(responseData, 0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b5fee0..f6f825f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -518,6 +518,12 @@ class Partition(val topic: String, info } + def logStartOffset: Long = { + inReadLock(leaderIsrUpdateLock) { + leaderReplicaIfLocal.map(_.log.get.logStartOffset).getOrElse(-1) + } + } + /** * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica. * This function can trigger log segment deletion and log rolling. http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d98f443..f32da4b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -47,8 +47,12 @@ import java.lang.{Long => JLong} import java.util.regex.Pattern object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + + def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = + LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, + NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** @@ -60,6 +64,7 @@ object LogAppendInfo { * @param maxTimestamp The maximum timestamp of the message set. * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. * @param sourceCodec The source codec used in the message set (send by the producer) * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) * @param shallowCount The number of shallow messages @@ -71,6 +76,7 @@ case class LogAppendInfo(var firstOffset: Long, var maxTimestamp: Long, var offsetOfMaxTimestamp: Long, var logAppendTime: Long, + var logStartOffset: Long, sourceCodec: CompressionCodec, targetCodec: CompressionCodec, shallowCount: Int, @@ -669,6 +675,7 @@ class Log(@volatile var dir: File, appendInfo.firstOffset = duplicate.firstOffset appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp + appendInfo.logStartOffset = logStartOffset return appendInfo } @@ -861,7 +868,7 @@ class Log(@volatile var dir: File, // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) - LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec, + LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 69d4e36..1cf9a14 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -161,9 +161,15 @@ private[log] class ProducerAppendInfo(val producerId: Long, s"with a newer epoch. $producerEpoch (request epoch), ${currentEntry.producerEpoch} (server epoch)") } else if (validateSequenceNumbers) { if (producerEpoch != currentEntry.producerEpoch) { - if (firstSeq != 0) - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + - s"(request epoch), $firstSeq (seq. number)") + if (firstSeq != 0) { + if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + + s"(request epoch), $firstSeq (seq. number)") + } else { + throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " + + s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.") + } + } } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) { // the epoch was bumped by a control record, so we expect the sequence number to be reset throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " + http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 064472d..9cc6317 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -445,7 +445,7 @@ class ReplicaManager(val config: KafkaConfig, topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset - new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status + new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime, result.info.logStartOffset)) // response status } if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { @@ -471,7 +471,7 @@ class ReplicaManager(val config: KafkaConfig, // Just return an error and don't handle the request at all val responseStatus = entriesPerPartition.map { case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, - LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP) + LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset) } responseCallback(responseStatus) } @@ -734,10 +734,16 @@ class ReplicaManager(val config: KafkaConfig, _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => + val logStartOffset = getPartition(topicPartition) match { + case Some(partition) => + partition.logStartOffset + case _ => + -1 + } brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() error("Error processing append operation on partition %s".format(topicPartition), t) - (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) + (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 592e343..2ffd828 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1354,7 +1354,7 @@ class GroupCoordinatorTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> - new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) )}) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() @@ -1438,7 +1438,7 @@ class GroupCoordinatorTest extends JUnitSuite { ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> - new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) ) }) @@ -1467,7 +1467,7 @@ class GroupCoordinatorTest extends JUnitSuite { ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) -> - new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) )}) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes() http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 24e2920..a2f5f92 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -837,7 +837,7 @@ class GroupMetadataManagerTest { assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> - new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP))) + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) @@ -877,7 +877,7 @@ class GroupMetadataManagerTest { assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> - new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP))) + new PartitionResponse(Errors.NOT_ENOUGH_REPLICAS, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertFalse(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) @@ -916,7 +916,7 @@ class GroupMetadataManagerTest { assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> - new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP))) + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertTrue(group.hasOffsets) assertTrue(group.allOffsets.isEmpty) @@ -995,7 +995,7 @@ class GroupMetadataManagerTest { groupMetadataManager.storeOffsets(group, memberId, offsets, callback) assertTrue(group.hasOffsets) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> - new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP))) + new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L))) assertFalse(commitErrors.isEmpty) val maybeError = commitErrors.get.get(topicPartition) @@ -1324,7 +1324,7 @@ class GroupMetadataManagerTest { ).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( Map(groupTopicPartition -> - new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) )}) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index e86e088..49c8e6a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -503,7 +503,7 @@ class TransactionStateManagerTest { override def answer(): Unit = { capturedArgument.getValue.apply( Map(partition -> - new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) ) } @@ -602,7 +602,7 @@ class TransactionStateManagerTest { ).andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) -> - new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP) + new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) ) } http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 976bbd7..9eb9ae7 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -305,7 +305,7 @@ class ProducerStateManagerTest extends JUnitSuite { append(recoveredMapping, producerId, epoch, 2, 2L) } - @Test(expected = classOf[OutOfOrderSequenceException]) + @Test(expected = classOf[UnknownProducerIdException]) def testRemoveExpiredPidsOnReload(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L, 0) @@ -482,14 +482,14 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, producerId, epoch, 2, 3L, 4L) stateManager.takeSnapshot() - intercept[OutOfOrderSequenceException] { + intercept[UnknownProducerIdException] { val recoveredMapping = new ProducerStateManager(partition, logDir, maxPidExpirationMs) recoveredMapping.truncateAndReload(0L, 1L, time.milliseconds) append(recoveredMapping, pid2, epoch, 1, 4L, 5L) } } - @Test(expected = classOf[OutOfOrderSequenceException]) + @Test(expected = classOf[UnknownProducerIdException]) def testPidExpirationTimeout() { val epoch = 5.toShort val sequence = 37 http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index a7817ad..b9a4dfe 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -114,9 +114,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { val correlationId = -1 TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) - val version = 2: Short + val version = ApiKeys.PRODUCE.latestVersion: Short val serializedBytes = { - val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.latestVersion, null, + val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, correlationId) val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes)) val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000, http://git-wip-us.apache.org/repos/asf/kafka/blob/94692288/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index f5581f8..27d4312 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -175,6 +175,55 @@ class ReplicaManagerTest { } @Test + def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = { + val timer = new MockTimer + val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) + + try { + val brokerList = Seq[Integer](0, 1).asJava + + val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0) + + // Make this replica the leader. + val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, + collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) + + val producerId = 234L + val epoch = 5.toShort + + // write a few batches as part of a transaction + val numRecords = 3 + for (sequence <- 0 until numRecords) { + val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, sequence, + new SimpleRecord(s"message $sequence".getBytes)) + appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } + } + + assertEquals(0, partition.logStartOffset) + + // Append a record with an out of range sequence. We should get the OutOfOrderSequence error code with the log + // start offset set. + val outOfRangeSequence = numRecords + 10 + val record = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, outOfRangeSequence, + new SimpleRecord(s"message: $outOfRangeSequence".getBytes)) + appendRecords(replicaManager, new TopicPartition(topic, 0), record).onFire { response => + assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, response.error) + assertEquals(0, response.logStartOffset) + } + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + + } + + @Test def testReadCommittedFetchLimitedAtLSO(): Unit = { val timer = new MockTimer val replicaManager = setupReplicaManagerWithMockedPurgatories(timer)