Repository: kafka Updated Branches: refs/heads/trunk 690575ec4 -> ea533f0c5
KAFKA-5913; Add hasOffset() and hasTimestamp() methods to RecordMetadata These methods help users check for cases in which this metadata was not returned by the broker (e.g. in the case of acks=0 or a duplicate error when idempotence is enabled). Author: Apurva Mehta <apu...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3878 from apurvam/KAFKA-5913-add-record-metadata-not-available-exception Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea533f0c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea533f0c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea533f0c Branch: refs/heads/trunk Commit: ea533f0c5e2438f0aef070dbadb023bffb9ce009 Parents: 690575e Author: Apurva Mehta <apu...@confluent.io> Authored: Thu Sep 21 14:30:37 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 21 14:39:00 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/RecordMetadata.java | 23 +++++++++++++++++++- .../clients/producer/RecordMetadataTest.java | 2 ++ .../clients/producer/internals/SenderTest.java | 6 ++++- .../kafka/api/ProducerFailureHandlingTest.scala | 6 ++++- 4 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 6757a6d..0924244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -18,6 +18,8 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.ProduceResponse; /** * The metadata for a record that has been acknowledged by the server @@ -65,17 +67,36 @@ public final class RecordMetadata { } /** + * Indicates whether the record metadata includes the offset. + * @return true if the offset is included in the metadata, false otherwise. + */ + public boolean hasOffset() { + return this.offset != ProduceResponse.INVALID_OFFSET; + } + + /** * The offset of the record in the topic/partition. + * @return the offset of the record, or -1 if {{@link #hasOffset()}} returns false. */ public long offset() { return this.offset; } /** + * Indicates whether the record metadata includes the timestamp. + * @return true if a valid timestamp exists, false otherwise. + */ + public boolean hasTimestamp() { + return this.timestamp != RecordBatch.NO_TIMESTAMP; + } + + /** * The timestamp of the record in the topic/partition. + * + * @return the timestamp of the record, or -1 if the {{@link #hasTimestamp()}} returns false. */ public long timestamp() { - return timestamp; + return this.timestamp; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java index a735a61..bc3ffc7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.record.DefaultRecord; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class RecordMetadataTest { @@ -37,6 +38,7 @@ public class RecordMetadataTest { assertEquals(tp.topic(), metadata.topic()); assertEquals(tp.partition(), metadata.partition()); assertEquals(timestamp, metadata.timestamp()); + assertFalse(metadata.hasOffset()); assertEquals(-1L, metadata.offset()); assertEquals(checksum.longValue(), metadata.checksum()); assertEquals(keySize, metadata.serializedKeySize()); http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 0995e36..e1ea10a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -1154,7 +1154,7 @@ public class SenderTest { assertEquals(1000, transactionManager.lastAckedOffset(tp0)); assertEquals(1, transactionManager.lastAckedSequence(tp0)); - client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.DUPLICATE_SEQUENCE_NUMBER, -1)); + client.respondToRequest(firstClientRequest, produceResponse(tp0, ProduceResponse.INVALID_OFFSET, Errors.DUPLICATE_SEQUENCE_NUMBER, 0)); sender.run(time.milliseconds()); // receive response 0 @@ -1162,6 +1162,10 @@ public class SenderTest { assertEquals(1, transactionManager.lastAckedSequence(tp0)); assertEquals(1000, transactionManager.lastAckedOffset(tp0)); assertFalse(client.hasInFlightRequests()); + + RecordMetadata unknownMetadata = request1.get(); + assertFalse(unknownMetadata.hasOffset()); + assertEquals(-1L, unknownMetadata.offset()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/ea533f0c/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 49a096a..e1b1c9d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -92,7 +92,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { // send a too-large record val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) - assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) + + val recordMetadata = producer1.send(record).get() + assertNotNull(recordMetadata) + assertFalse(recordMetadata.hasOffset) + assertEquals(-1L, recordMetadata.offset) } /**