Repository: kafka Updated Branches: refs/heads/0.11.0 60d2f1294 -> 172041d57
KAFKA-5378; Return LSO in FetchResponse plus some metrics Author: Jason Gustafson <[email protected]> Reviewers: Jun Rao <[email protected]>, Ismael Juma <[email protected]> Closes #3248 from hachikuji/KAFKA-5378 (cherry picked from commit dcbdce31ba525771016be5be4abc4a2067e0890b) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/172041d5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/172041d5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/172041d5 Branch: refs/heads/0.11.0 Commit: 172041d572a53ae7984216837bf956f7f9cfd0fa Parents: 60d2f12 Author: Jason Gustafson <[email protected]> Authored: Wed Jun 7 16:14:09 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Jun 7 16:46:18 2017 +0100 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 7 +- .../consumer/internals/SubscriptionState.java | 14 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 2 +- .../producer/internals/TransactionManager.java | 4 +- .../clients/consumer/internals/FetcherTest.java | 118 ++++++--- .../internals/TransactionManagerTest.java | 86 +++---- .../main/scala/kafka/cluster/Partition.scala | 11 + core/src/main/scala/kafka/log/Log.scala | 11 +- .../main/scala/kafka/server/DelayedFetch.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../scala/kafka/server/ReplicaManager.scala | 42 ++-- .../unit/kafka/server/ISRExpirationTest.scala | 25 +- .../unit/kafka/server/ReplicaManagerTest.scala | 249 ++++++++++--------- .../unit/kafka/server/SimpleFetchTest.scala | 5 +- 15 files changed, 332 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e73ff4e..dff223f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -571,7 +571,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { "position to {}", position, partitionRecords.partition, nextOffset); subscriptions.position(partitionRecords.partition, nextOffset); - Long partitionLag = subscriptions.partitionLag(partitionRecords.partition); + Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel); if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag); @@ -855,6 +855,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark); subscriptions.updateHighWatermark(tp, partition.highWatermark); } + + if (partition.lastStableOffset >= 0) { + log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); + subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); + } } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); this.metadata.requestUpdate(); http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 421a3cf..e852bd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; +import org.apache.kafka.common.requests.IsolationLevel; import java.util.ArrayList; import java.util.Collection; @@ -318,15 +319,22 @@ public class SubscriptionState { return assignedState(tp).position; } - public Long partitionLag(TopicPartition tp) { + public Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { TopicPartitionState topicPartitionState = assignedState(tp); - return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position; + if (isolationLevel == IsolationLevel.READ_COMMITTED) + return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position; + else + return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position; } public void updateHighWatermark(TopicPartition tp, long highWatermark) { assignedState(tp).highWatermark = highWatermark; } + public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) { + assignedState(tp).lastStableOffset = lastStableOffset; + } + public Map<TopicPartition, OffsetAndMetadata> allConsumed() { Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>(); for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) { @@ -427,6 +435,7 @@ public class SubscriptionState { private static class TopicPartitionState { private Long position; // last consumed position private Long highWatermark; // the high watermark from last fetch + private Long lastStableOffset; private OffsetAndMetadata committed; // last committed position private boolean paused; // whether this partition has been paused by the user private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting @@ -435,6 +444,7 @@ public class SubscriptionState { this.paused = false; this.position = null; this.highWatermark = null; + this.lastStableOffset = null; this.committed = null; this.resetStrategy = null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0f109d8..a7336d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { if (transactionManager != null) - transactionManager.failIfUnreadyForSend(); + transactionManager.failIfNotReadyForSend(); TopicPartition tp = null; try { http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 0315b13..5d4af74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -469,7 +469,7 @@ public final class RecordAccumulator { ProducerIdAndEpoch producerIdAndEpoch = null; boolean isTransactional = false; if (transactionManager != null) { - if (!transactionManager.sendToPartitionAllowed(tp)) + if (!transactionManager.isSendToPartitionAllowed(tp)) break; producerIdAndEpoch = transactionManager.producerIdAndEpoch(); http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index dcd7a1f..d959d7d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -235,7 +235,7 @@ public class TransactionManager { return lastError; } - public synchronized void failIfUnreadyForSend() { + public synchronized void failIfNotReadyForSend() { if (hasError()) throw new KafkaException("Cannot perform send because at least one previous transactional or " + "idempotent request has failed with errors.", lastError); @@ -250,7 +250,7 @@ public class TransactionManager { } } - synchronized boolean sendToPartitionAllowed(TopicPartition tp) { + synchronized boolean isSendToPartitionAllowed(TopicPartition tp) { if (hasFatalError()) return false; return !isTransactional() || partitionsInTransaction.contains(tp); http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 0ed1139..cad17bc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -170,7 +170,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -212,7 +212,7 @@ public class FetcherTest { buffer.flip(); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -235,7 +235,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -276,7 +276,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -329,7 +329,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); // the first fetchedRecords() should return the first valid message @@ -351,7 +351,7 @@ public class FetcherTest { // Should not throw exception after the seek. fetcher.fetchedRecords(); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1); @@ -384,7 +384,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); // the fetchedRecords() should always throw exception due to the bad batch. @@ -415,7 +415,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -448,7 +448,7 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(memoryRecords, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, memoryRecords, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -478,8 +478,8 @@ public class FetcherTest { subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(this.records, Errors.NONE, 100L, 0)); - client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(this.nextRecords, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -562,7 +562,7 @@ public class FetcherTest { // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, records, Errors.NONE, 100L, 0)); consumerClient.poll(0); consumerRecords = fetcher.fetchedRecords().get(tp1); assertEquals(3, consumerRecords.size()); @@ -622,7 +622,7 @@ public class FetcherTest { assertFalse(fetcher.hasCompletedFetches()); MemoryRecords partialRecord = MemoryRecords.readableRecords( ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})); - client.prepareResponse(fetchResponse(partialRecord, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, partialRecord, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); } @@ -634,7 +634,7 @@ public class FetcherTest { // resize the limit of the buffer to pretend it is only fetch-size large assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -654,7 +654,7 @@ public class FetcherTest { // Now the rebalance happens and fetch positions are cleared subscriptions.assignFromSubscribed(singleton(tp1)); - client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); // The active fetch should be ignored since its position is no longer valid @@ -669,7 +669,7 @@ public class FetcherTest { assertEquals(1, fetcher.sendFetches()); subscriptions.pause(tp1); - client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertNull(fetcher.fetchedRecords().get(tp1)); } @@ -690,7 +690,7 @@ public class FetcherTest { subscriptions.seek(tp1, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -702,7 +702,7 @@ public class FetcherTest { subscriptions.seek(tp1, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -714,7 +714,7 @@ public class FetcherTest { subscriptions.seek(tp1, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp1)); @@ -729,7 +729,7 @@ public class FetcherTest { subscriptions.seek(tp1, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); subscriptions.seek(tp1, 1); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -743,7 +743,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp1, 0); assertTrue(fetcherNoAutoReset.sendFetches() > 0); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); subscriptionsNoAutoReset.seek(tp1, 2); @@ -756,7 +756,7 @@ public class FetcherTest { subscriptionsNoAutoReset.seek(tp1, 0); fetcherNoAutoReset.sendFetches(); - client.prepareResponse(fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); @@ -824,7 +824,7 @@ public class FetcherTest { Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); - client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertEquals(2, fetcher.fetchedRecords().get(tp1).size()); @@ -850,7 +850,7 @@ public class FetcherTest { subscriptions.seek(tp1, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0), true); + client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); @@ -1116,7 +1116,7 @@ public class FetcherTest { ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); - FetchResponse response = fetchResponse(nextRecords, Errors.NONE, i, throttleTimeMs); + FetchResponse response = fetchResponse(tp1, nextRecords, Errors.NONE, i, throttleTimeMs); buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId())); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); client.poll(1, time.milliseconds()); @@ -1131,7 +1131,6 @@ public class FetcherTest { client.close(); } - /* * Send multiple requests. Verify that the client side quota metrics have the right values */ @@ -1150,7 +1149,7 @@ public class FetcherTest { assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse - fetchRecords(MemoryRecords.EMPTY, Errors.NONE, 100L, 0); + fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 0); assertEquals(100, recordsFetchLagMax.value(), EPSILON); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); @@ -1161,8 +1160,48 @@ public class FetcherTest { TimestampType.CREATE_TIME, 0L); for (int v = 0; v < 3; v++) builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); - fetchRecords(builder.build(), Errors.NONE, 200L, 0); + fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 0); assertEquals(197, recordsFetchLagMax.value(), EPSILON); + assertEquals(197, partitionLag.value(), EPSILON); + + // verify de-registration of partition lag + subscriptions.unsubscribe(); + assertFalse(allMetrics.containsKey(partitionLagMetric)); + } + + @Test + public void testReadCommittedLagMetric() { + Metrics metrics = new Metrics(); + fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), + new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); + + subscriptions.assignFromUser(singleton(tp1)); + subscriptions.seek(tp1, 0); + + MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); + MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup); + + Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); + KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); + + // recordsFetchLagMax should be initialized to negative infinity + assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); + + // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse + fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0); + assertEquals(50, recordsFetchLagMax.value(), EPSILON); + + KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); + assertEquals(50, partitionLag.value(), EPSILON); + + // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, + TimestampType.CREATE_TIME, 0L); + for (int v = 0; v < 3; v++) + builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); + fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0); + assertEquals(147, recordsFetchLagMax.value(), EPSILON); + assertEquals(147, partitionLag.value(), EPSILON); // verify de-registration of partition lag subscriptions.unsubscribe(); @@ -1188,7 +1227,7 @@ public class FetcherTest { for (Record record : records.records()) expectedBytes += record.sizeInBytes(); - fetchRecords(records, Errors.NONE, 100L, 0); + fetchRecords(tp1, records, Errors.NONE, 100L, 0); assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON); assertEquals(3, recordsCountAverage.value(), EPSILON); } @@ -1214,7 +1253,7 @@ public class FetcherTest { expectedBytes += record.sizeInBytes(); } - fetchRecords(records, Errors.NONE, 100L, 0); + fetchRecords(tp1, records, Errors.NONE, 100L, 0); assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON); assertEquals(2, recordsCountAverage.value(), EPSILON); } @@ -1294,9 +1333,15 @@ public class FetcherTest { assertEquals(3, recordsCountAverage.value(), EPSILON); } - private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, Errors error, long hw, int throttleTime) { + private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords( + TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { + return fetchRecords(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime); + } + + private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords( + TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) { assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(records, error, hw, throttleTime)); + client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, throttleTime)); consumerClient.poll(0); return fetcher.fetchedRecords(); } @@ -1821,13 +1866,14 @@ public class FetcherTest { return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime); } - private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) { - return fetchResponse(tp1, records, error, hw, throttleTime); + private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { + return fetchResponse(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime); } - private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { + private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, + long lastStableOffset, int throttleTime) { Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, - new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records)); + new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, records)); return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime); } http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 4f2f10f..55c9241 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -123,48 +123,48 @@ public class TransactionManagerTest { } @Test(expected = IllegalStateException.class) - public void testFailIfUnreadyForSendNoProducerId() { - transactionManager.failIfUnreadyForSend(); + public void testFailIfNotReadyForSendNoProducerId() { + transactionManager.failIfNotReadyForSend(); } @Test - public void testFailIfUnreadyForSendIdempotentProducer() { + public void testFailIfNotReadyForSendIdempotentProducer() { TransactionManager idempotentTransactionManager = new TransactionManager(); - idempotentTransactionManager.failIfUnreadyForSend(); + idempotentTransactionManager.failIfNotReadyForSend(); } @Test(expected = KafkaException.class) - public void testFailIfUnreadyForSendIdempotentProducerFatalError() { + public void testFailIfNotReadyForSendIdempotentProducerFatalError() { TransactionManager idempotentTransactionManager = new TransactionManager(); idempotentTransactionManager.transitionToFatalError(new KafkaException()); - idempotentTransactionManager.failIfUnreadyForSend(); + idempotentTransactionManager.failIfNotReadyForSend(); } @Test(expected = IllegalStateException.class) - public void testFailIfUnreadyForSendNoOngoingTransaction() { + public void testFailIfNotReadyForSendNoOngoingTransaction() { long pid = 13131L; short epoch = 1; doInitTransactions(pid, epoch); - transactionManager.failIfUnreadyForSend(); + transactionManager.failIfNotReadyForSend(); } @Test(expected = KafkaException.class) - public void testFailIfUnreadyForSendAfterAbortableError() { + public void testFailIfNotReadyForSendAfterAbortableError() { long pid = 13131L; short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); transactionManager.transitionToAbortableError(new KafkaException()); - transactionManager.failIfUnreadyForSend(); + transactionManager.failIfNotReadyForSend(); } @Test(expected = KafkaException.class) - public void testFailIfUnreadyForSendAfterFatalError() { + public void testFailIfNotReadyForSendAfterFatalError() { long pid = 13131L; short epoch = 1; doInitTransactions(pid, epoch); transactionManager.transitionToFatalError(new KafkaException()); - transactionManager.failIfUnreadyForSend(); + transactionManager.failIfNotReadyForSend(); } @Test @@ -334,7 +334,7 @@ public class TransactionManagerTest { } @Test - public void testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() { + public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() { final long pid = 13131L; final short epoch = 1; @@ -344,12 +344,12 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); transactionManager.transitionToAbortableError(new KafkaException()); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasAbortableError()); } @Test - public void testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() { + public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() { final long pid = 13131L; final short epoch = 1; @@ -362,12 +362,12 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); transactionManager.transitionToAbortableError(new KafkaException()); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasAbortableError()); } @Test - public void testSendToPartitionAllowedWithPendingPartitionAfterFatalError() { + public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() { final long pid = 13131L; final short epoch = 1; @@ -377,12 +377,12 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); transactionManager.transitionToFatalError(new KafkaException()); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasFatalError()); } @Test - public void testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() { + public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() { final long pid = 13131L; final short epoch = 1; @@ -395,12 +395,12 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); transactionManager.transitionToFatalError(new KafkaException()); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasFatalError()); } @Test - public void testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() { + public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() { final long pid = 13131L; final short epoch = 1; @@ -414,12 +414,12 @@ public class TransactionManagerTest { assertFalse(transactionManager.hasPartitionsToAdd()); transactionManager.transitionToAbortableError(new KafkaException()); - assertTrue(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasAbortableError()); } @Test - public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() { + public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() { final long pid = 13131L; final short epoch = 1; @@ -432,17 +432,17 @@ public class TransactionManagerTest { assertFalse(transactionManager.hasPartitionsToAdd()); transactionManager.transitionToFatalError(new KafkaException()); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertTrue(transactionManager.hasFatalError()); } @Test - public void testSendToPartitionAllowedWithUnaddedPartition() { + public void testIsSendToPartitionAllowedWithPartitionNotAdded() { final long pid = 13131L; final short epoch = 1; doInitTransactions(pid, epoch); transactionManager.beginTransaction(); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); } @Test(expected = IllegalStateException.class) @@ -487,11 +487,11 @@ public class TransactionManagerTest { prepareProduceResponse(Errors.NONE, pid, epoch); assertFalse(transactionManager.transactionContainsPartition(tp0)); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); sender.run(time.milliseconds()); // send addPartitions. // Check that only addPartitions was sent. assertTrue(transactionManager.transactionContainsPartition(tp0)); - assertTrue(transactionManager.sendToPartitionAllowed(tp0)); + assertTrue(transactionManager.isSendToPartitionAllowed(tp0)); assertFalse(responseFuture.isDone()); sender.run(time.milliseconds()); // send produce request. @@ -1300,8 +1300,8 @@ public class TransactionManagerTest { accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); - assertFalse(transactionManager.sendToPartitionAllowed(tp0)); - assertFalse(transactionManager.sendToPartitionAllowed(tp1)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp1)); Node node1 = new Node(0, "localhost", 1111); Node node2 = new Node(1, "localhost", 1112); @@ -1341,7 +1341,7 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // Send AddPartitions, should be in abortable state. assertTrue(transactionManager.hasAbortableError()); - assertTrue(transactionManager.sendToPartitionAllowed(tp1)); + assertTrue(transactionManager.isSendToPartitionAllowed(tp1)); // Try to drain a message destined for tp1, it should get drained. Node node1 = new Node(1, "localhost", 1112); @@ -1361,30 +1361,6 @@ public class TransactionManagerTest { assertTrue(transactionManager.hasAbortableError()); } - @Test - public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException { - final long pid = 13131L; - final short epoch = 1; - doInitTransactions(pid, epoch); - transactionManager.beginTransaction(); - // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain. - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); - Node node1 = new Node(0, "localhost", 1111); - PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); - - Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1), - Collections.<String>emptySet(), Collections.<String>emptySet()); - Set<Node> nodes = new HashSet<>(); - nodes.add(node1); - Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, nodes, Integer.MAX_VALUE, - time.milliseconds()); - - // We shouldn't drain batches which haven't been added to the transaction yet. - assertTrue(drainedBatches.containsKey(node1.id())); - assertTrue(drainedBatches.get(node1.id()).isEmpty()); - } - private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e617404..aa11ba1 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -98,6 +98,17 @@ class Partition(val topic: String, tags ) + newGauge("LastStableOffsetLag", + new Gauge[Long] { + def value = { + leaderReplicaIfLocal.map { replica => + replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset + }.getOrElse(0) + } + }, + tags + ) + private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined def isUnderReplicated: Boolean = http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4bd2c2c..7a3bc94 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -868,13 +868,18 @@ class Log(@volatile var dir: File, // We create the local variables to avoid race conditions with updates to the log. val currentNextOffsetMetadata = nextOffsetMetadata val next = currentNextOffsetMetadata.messageOffset - if(startOffset == next) - return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY) + if (startOffset == next) { + val abortedTransactions = + if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction]) + else None + return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false, + abortedTransactions = abortedTransactions) + } var segmentEntry = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset - if(startOffset > next || segmentEntry == null || startOffset < logStartOffset) + if (startOffset > next || segmentEntry == null || startOffset < logStartOffset) throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next)) // Do the read on the segment with a base offset less than the target offset http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/DelayedFetch.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 13a01bc..8a9ce02 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -156,8 +156,8 @@ class DelayedFetch(delayMs: Long, isolationLevel = isolationLevel) val fetchPartitionData = logReadResults.map { case (tp, result) => - tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records, - result.info.abortedTransactions) + tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, + result.lastStableOffset, result.info.abortedTransactions) } responseCallback(fetchPartitionData) http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1d289b3..6cff0e6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -533,7 +533,8 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionData = { responsePartitionData.map { case (tp, data) => val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull - tp -> new FetchResponse.PartitionData(data.error, data.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, + val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) + tp -> new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset, data.logStartOffset, abortedTransactions, data.records) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d2670b7..853b7c4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -74,12 +74,13 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc * @param error Exception if error encountered while reading from the log */ case class LogReadResult(info: FetchDataInfo, - hw: Long, + highWatermark: Long, leaderLogStartOffset: Long, leaderLogEndOffset: Long, followerLogStartOffset: Long, fetchTimeMs: Long, readSize: Int, + lastStableOffset: Option[Long], exception: Option[Throwable] = None) { def error: Errors = exception match { @@ -88,22 +89,27 @@ case class LogReadResult(info: FetchDataInfo, } override def toString = - s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + + s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]" } -case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records, +case class FetchPartitionData(error: Errors = Errors.NONE, + highWatermark: Long, + logStartOffset: Long, + records: Records, + lastStableOffset: Option[Long], abortedTransactions: Option[List[AbortedTransaction]]) object LogReadResult { val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), - hw = -1L, + highWatermark = -1L, leaderLogStartOffset = -1L, leaderLogEndOffset = -1L, followerLogStartOffset = -1L, fetchTimeMs = -1L, - readSize = -1) + readSize = -1, + lastStableOffset = None) } case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Errors], error: Errors) { @@ -627,8 +633,8 @@ class ReplicaManager(val config: KafkaConfig, // 4) some error happens while reading data if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => - tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records, - result.info.abortedTransactions) + tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, + result.lastStableOffset, result.info.abortedTransactions) } responseCallback(fetchPartitionData) } else { @@ -684,11 +690,15 @@ class ReplicaManager(val config: KafkaConfig, else getReplicaOrException(tp) - // decide whether to only fetch committed data (i.e. messages below high watermark) - val maxOffsetOpt = if (isolationLevel == IsolationLevel.READ_COMMITTED) + val initialHighWatermark = localReplica.highWatermark.messageOffset + val lastStableOffset = if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(localReplica.lastStableOffset.messageOffset) - else if (readOnlyCommitted) - Some(localReplica.highWatermark.messageOffset) + else + None + + // decide whether to only fetch committed data (i.e. messages below high watermark) + val maxOffsetOpt = if (readOnlyCommitted) + Some(lastStableOffset.getOrElse(initialHighWatermark)) else None @@ -699,7 +709,6 @@ class ReplicaManager(val config: KafkaConfig, * This can cause a replica to always be out of sync. */ val initialLogEndOffset = localReplica.logEndOffset.messageOffset - val initialHighWatermark = localReplica.highWatermark.messageOffset val initialLogStartOffset = localReplica.logStartOffset val fetchTimeMs = time.milliseconds val logReadInfo = localReplica.log match { @@ -724,12 +733,13 @@ class ReplicaManager(val config: KafkaConfig, } LogReadResult(info = logReadInfo, - hw = initialHighWatermark, + highWatermark = initialHighWatermark, leaderLogStartOffset = initialLogStartOffset, leaderLogEndOffset = initialLogEndOffset, followerLogStartOffset = followerLogStartOffset, fetchTimeMs = fetchTimeMs, readSize = partitionFetchSize, + lastStableOffset = lastStableOffset, exception = None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it @@ -739,24 +749,26 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: OffsetOutOfRangeException) => LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), - hw = -1L, + highWatermark = -1L, leaderLogStartOffset = -1L, leaderLogEndOffset = -1L, followerLogStartOffset = -1L, fetchTimeMs = -1L, readSize = partitionFetchSize, + lastStableOffset = None, exception = Some(e)) case e: Throwable => brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() error(s"Error processing fetch operation on partition $tp, offset $offset", e) LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), - hw = -1L, + highWatermark = -1L, leaderLogStartOffset = -1L, leaderLogEndOffset = -1L, followerLogStartOffset = -1L, fetchTimeMs = -1L, readSize = partitionFetchSize, + lastStableOffset = None, exception = Some(e)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index da94569..5d221fe 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -80,12 +80,13 @@ class IsrExpirationTest { // let the follower catch up to the Leader logEndOffset (15) for (replica <- partition0.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - hw = 15L, + highWatermark = 15L, leaderLogStartOffset = 0L, leaderLogEndOffset = 15L, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -134,12 +135,13 @@ class IsrExpirationTest { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for (replica <- partition0.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), - hw = 10L, + highWatermark = 10L, leaderLogStartOffset = 0L, leaderLogEndOffset = 15L, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -150,12 +152,13 @@ class IsrExpirationTest { (partition0.assignedReplicas - leaderReplica).foreach { r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), - hw = 11L, + highWatermark = 11L, leaderLogStartOffset = 0L, leaderLogEndOffset = 15L, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) } partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -169,12 +172,13 @@ class IsrExpirationTest { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas - leaderReplica).foreach { r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - hw = 15L, + highWatermark = 15L, leaderLogStartOffset = 0L, leaderLogEndOffset = 15L, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) } partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -195,12 +199,13 @@ class IsrExpirationTest { // set lastCaughtUpTime to current time for (replica <- partition.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), - hw = 0L, + highWatermark = 0L, leaderLogStartOffset = 0L, leaderLogEndOffset = 0L, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) partition http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 45f62d7..e078e12 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,9 +32,10 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.FetchRequest.PartitionData +import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.{Node, TopicPartition} import org.easymock.EasyMock -import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -139,20 +140,6 @@ class ReplicaManagerTest { metadataCache) try { - var produceCallbackFired = false - def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = { - assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION, - responseStatus.values.head.error) - produceCallbackFired = true - } - - var fetchCallbackFired = false - def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { - assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION, - responseStatus.map(_._2).head.error) - fetchCallbackFired = true - } - val brokerList = Seq[Integer](0, 1).asJava val brokerSet = Set[Integer](0, 1).asJava @@ -166,11 +153,14 @@ class ReplicaManagerTest { rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes())) - appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback) + val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error) + } // Fetch some messages - fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), fetchCallback, + val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000), minBytes = 100000) + assertFalse(fetchResult.isFired) // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, @@ -178,8 +168,8 @@ class ReplicaManagerTest { Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) - assertTrue(produceCallbackFired) - assertTrue(fetchCallbackFired) + assertTrue(appendResult.isFired) + assertTrue(fetchResult.isFired) } finally { rm.shutdown(checkpointHW = false) } @@ -204,10 +194,6 @@ class ReplicaManagerTest { replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) - def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = - responseStatus.values.foreach { status => - assertEquals(Errors.NONE, status.error) - } val producerId = 234L val epoch = 5.toShort @@ -217,67 +203,64 @@ class ReplicaManagerTest { for (sequence <- 0 until numRecords) { val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) - appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback) - } - - var fetchCallbackFired = false - var fetchError = Errors.NONE - var fetchedRecords: Records = null - def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { - fetchError = responseStatus.map(_._2).head.error - fetchedRecords = responseStatus.map(_._2).head.records - fetchCallbackFired = true + appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } } // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED) + fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords, 0, 100000), + isolationLevel = IsolationLevel.READ_UNCOMMITTED) // fetch should return empty since LSO should be stuck at 0 - fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) - assertTrue(fetchCallbackFired) - assertEquals(Errors.NONE, fetchError) - assertTrue(fetchedRecords.batches.asScala.isEmpty) - fetchCallbackFired = false + var consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), + new PartitionData(0, 0, 100000), isolationLevel = IsolationLevel.READ_COMMITTED) + var fetchData = consumerFetchResult.assertFired + assertEquals(Errors.NONE, fetchData.error) + assertTrue(fetchData.records.batches.asScala.isEmpty) + assertEquals(Some(0), fetchData.lastStableOffset) + assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions) // delayed fetch should timeout and return nothing - fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000) + consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000), + isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 1000) + assertFalse(consumerFetchResult.isFired) timer.advanceClock(1001) - assertTrue(fetchCallbackFired) - assertEquals(Errors.NONE, fetchError) - assertTrue(fetchedRecords.batches.asScala.isEmpty) - fetchCallbackFired = false + fetchData = consumerFetchResult.assertFired + assertEquals(Errors.NONE, fetchData.error) + assertTrue(fetchData.records.batches.asScala.isEmpty) + assertEquals(Some(0), fetchData.lastStableOffset) + assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions) // now commit the transaction val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) - appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> commitRecordBatch), produceCallback, - isFromClient = false) + appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, isFromClient = false) + .onFire { response => assertEquals(Errors.NONE, response.error) } // the LSO has advanced, but the appended commit marker has not been replicated, so // none of the data from the transaction should be visible yet - fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) + consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000), + isolationLevel = IsolationLevel.READ_COMMITTED) - assertTrue(fetchCallbackFired) - assertEquals(Errors.NONE, fetchError) - assertTrue(fetchedRecords.batches.asScala.isEmpty) - fetchCallbackFired = false + fetchData = consumerFetchResult.assertFired + assertEquals(Errors.NONE, fetchData.error) + assertTrue(fetchData.records.batches.asScala.isEmpty) // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_UNCOMMITTED) + fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords + 1, 0, 100000), + isolationLevel = IsolationLevel.READ_UNCOMMITTED) // now all of the records should be fetchable - fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED) + consumerFetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000), + isolationLevel = IsolationLevel.READ_COMMITTED) - assertTrue(fetchCallbackFired) - assertEquals(Errors.NONE, fetchError) - assertEquals(numRecords + 1, fetchedRecords.batches.asScala.size) + fetchData = consumerFetchResult.assertFired + assertEquals(Errors.NONE, fetchData.error) + assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) + assertEquals(Some(List.empty[AbortedTransaction]), fetchData.abortedTransactions) + assertEquals(numRecords + 1, fetchData.records.batches.asScala.size) } finally { replicaManager.shutdown(checkpointHW = false) } @@ -302,11 +285,6 @@ class ReplicaManagerTest { replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) - def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = - responseStatus.values.foreach { status => - assertEquals(Errors.NONE, status.error) - } - val producerId = 234L val epoch = 5.toShort @@ -315,35 +293,32 @@ class ReplicaManagerTest { for (sequence <- 0 until numRecords) { val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) - appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> records), produceCallback) + appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } } // now abort the transaction val endTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0) val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) - appendRecords(replicaManager, Map(new TopicPartition(topic, 0) -> abortRecordBatch), produceCallback, - isFromClient = false) + appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, isFromClient = false) + .onFire { response => assertEquals(Errors.NONE, response.error) } // fetch as follower to advance the high watermark - fetchAsFollower(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(numRecords + 1, 0, 100000)), - responseState => (), isolationLevel = IsolationLevel.READ_UNCOMMITTED) - - var fetchDataOpt: Option[FetchPartitionData] = None - def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { - fetchDataOpt = Some(responseStatus.map(_._2).head) - } + fetchAsFollower(replicaManager, new TopicPartition(topic, 0), new PartitionData(numRecords + 1, 0, 100000), + isolationLevel = IsolationLevel.READ_UNCOMMITTED) // Set the minBytes in order force this request to enter purgatory. When it returns, we should still // see the newly aborted transaction. - fetchAsConsumer(replicaManager, Seq(new TopicPartition(topic, 0) -> new PartitionData(0, 0, 100000)), - fetchCallback, isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 10000) - assertTrue(fetchDataOpt.isEmpty) + val fetchResult = fetchAsConsumer(replicaManager, new TopicPartition(topic, 0), new PartitionData(0, 0, 100000), + isolationLevel = IsolationLevel.READ_COMMITTED, minBytes = 10000) + assertFalse(fetchResult.isFired) timer.advanceClock(1001) - assertTrue(fetchDataOpt.isDefined) + val fetchData = fetchResult.assertFired - val fetchData = fetchDataOpt.get assertEquals(Errors.NONE, fetchData.error) + assertEquals(Some(numRecords + 1), fetchData.lastStableOffset) assertEquals(numRecords + 1, fetchData.records.records.asScala.size) assertTrue(fetchData.abortedTransactions.isDefined) assertEquals(1, fetchData.abortedTransactions.get.size) @@ -389,84 +364,118 @@ class ReplicaManagerTest { rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0)) - def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {} - // Append a couple of messages. for(i <- 1 to 2) { val records = TestUtils.singletonRecords(s"message $i".getBytes) - appendRecords(rm, Map(new TopicPartition(topic, 0) -> records), produceCallback) - } - - var fetchCallbackFired = false - var fetchError = Errors.NONE - var fetchedRecords: Records = null - def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { - fetchError = responseStatus.map(_._2).head.error - fetchedRecords = responseStatus.map(_._2).head.records - fetchCallbackFired = true + appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => + assertEquals(Errors.NONE, response.error) + } } // Fetch a message above the high watermark as a follower - fetchAsFollower(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback) - assertTrue(fetchCallbackFired) - assertEquals("Should not give an exception", Errors.NONE, fetchError) - assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext) - fetchCallbackFired = false + val followerFetchResult = fetchAsFollower(rm, new TopicPartition(topic, 0), new PartitionData(1, 0, 100000)) + val followerFetchData = followerFetchResult.assertFired + assertEquals("Should not give an exception", Errors.NONE, followerFetchData.error) + assertTrue("Should return some data", followerFetchData.records.batches.iterator.hasNext) // Fetch a message above the high watermark as a consumer - fetchAsConsumer(rm, Seq(new TopicPartition(topic, 0) -> new PartitionData(1, 0, 100000)), fetchCallback) - assertTrue(fetchCallbackFired) - assertEquals("Should not give an exception", Errors.NONE, fetchError) - assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords) + val consumerFetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0), new PartitionData(1, 0, 100000)) + val consumerFetchData = consumerFetchResult.assertFired + assertEquals("Should not give an exception", Errors.NONE, consumerFetchData.error) + assertEquals("Should return empty response", MemoryRecords.EMPTY, consumerFetchData.records) } finally { rm.shutdown(checkpointHW = false) } } + private class CallbackResult[T] { + private var value: Option[T] = None + private var fun: Option[T => Unit] = None + + def assertFired: T = { + assertTrue("Callback has not been fired", isFired) + value.get + } + + def isFired: Boolean = { + value.isDefined + } + + def fire(value: T): Unit = { + this.value = Some(value) + fun.foreach(f => f(value)) + } + + def onFire(fun: T => Unit): CallbackResult[T] = { + this.fun = Some(fun) + if (this.isFired) fire(value.get) + this + } + } + private def appendRecords(replicaManager: ReplicaManager, - entriesPerPartition: Map[TopicPartition, MemoryRecords], - responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - isFromClient: Boolean = true): Unit = { + partition: TopicPartition, + records: MemoryRecords, + isFromClient: Boolean = true): CallbackResult[PartitionResponse] = { + val result = new CallbackResult[PartitionResponse]() + def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = { + val response = responses.get(partition) + assertTrue(response.isDefined) + result.fire(response.get) + } + replicaManager.appendRecords( timeout = 1000, requiredAcks = -1, internalTopicsAllowed = false, isFromClient = isFromClient, - entriesPerPartition = entriesPerPartition, - responseCallback = responseCallback) + entriesPerPartition = Map(partition -> records), + responseCallback = appendCallback) + + result } private def fetchAsConsumer(replicaManager: ReplicaManager, - fetchInfos: Seq[(TopicPartition, PartitionData)], - fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + partition: TopicPartition, + partitionData: PartitionData, minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { - fetchMessages(replicaManager, replicaId = -1, fetchInfos, fetchCallback, minBytes, isolationLevel) + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { + fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel) } private def fetchAsFollower(replicaManager: ReplicaManager, - fetchInfos: Seq[(TopicPartition, PartitionData)], - fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + partition: TopicPartition, + partitionData: PartitionData, minBytes: Int = 0, - isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): Unit = { - fetchMessages(replicaManager, replicaId = 1, fetchInfos, fetchCallback, minBytes, isolationLevel) + isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { + fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel) } private def fetchMessages(replicaManager: ReplicaManager, replicaId: Int, - fetchInfos: Seq[(TopicPartition, PartitionData)], - fetchCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, + partition: TopicPartition, + partitionData: PartitionData, minBytes: Int, - isolationLevel: IsolationLevel): Unit = { + isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = { + val result = new CallbackResult[FetchPartitionData]() + def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { + assertEquals(1, responseStatus.size) + val (topicPartition, fetchData) = responseStatus.head + assertEquals(partition, topicPartition) + result.fire(fetchData) + } + replicaManager.fetchMessages( timeout = 1000, replicaId = replicaId, fetchMinBytes = minBytes, fetchMaxBytes = Int.MaxValue, hardMaxBytesLimit = false, - fetchInfos = fetchInfos, + fetchInfos = Seq(partition -> partitionData), responseCallback = fetchCallback, isolationLevel = isolationLevel) + + result } private def setupReplicaManagerWithMockedPurgatories(timer: MockTimer): ReplicaManager = { http://git-wip-us.apache.org/repos/asf/kafka/blob/172041d5/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index dad4b78..72d7fc5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -124,12 +124,13 @@ class SimpleFetchTest { val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) followerReplica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(leo, MemoryRecords.EMPTY), - hw = leo.messageOffset, + highWatermark = leo.messageOffset, leaderLogStartOffset = 0L, leaderLogEndOffset = leo.messageOffset, followerLogStartOffset = 0L, fetchTimeMs = time.milliseconds, - readSize = -1)) + readSize = -1, + lastStableOffset = None)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica)
