Repository: kafka Updated Branches: refs/heads/trunk d2f50fc38 -> c5df2a8e3
KAFKA-1634; Bump up Offset Commit Request to v2 to add global retention and remove per-partition commit timestamp; reviewed by Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5df2a8e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5df2a8e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5df2a8e Branch: refs/heads/trunk Commit: c5df2a8e3acca1e2c905fa6b78e73e09b1dd0cd7 Parents: d2f50fc Author: Guozhang Wang <[email protected]> Authored: Thu Mar 26 17:16:33 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 26 17:16:33 2015 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Coordinator.java | 8 +- .../apache/kafka/common/protocol/Protocol.java | 77 ++++++++---- .../kafka/common/protocol/types/Struct.java | 6 +- .../common/requests/OffsetCommitRequest.java | 103 +++++++++++++--- .../common/requests/RequestResponseTest.java | 4 +- .../scala/kafka/api/OffsetCommitRequest.scala | 73 +++++++---- .../scala/kafka/api/OffsetFetchRequest.scala | 13 +- .../kafka/common/OffsetMetadataAndError.scala | 64 ++++++---- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 34 +++++- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/server/OffsetManager.scala | 97 +++++++++++---- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../api/RequestResponseSerializationTest.scala | 50 +++++--- .../unit/kafka/server/OffsetCommitTest.scala | 122 +++++++++++++++---- .../unit/kafka/server/ServerShutdownTest.scala | 4 +- 17 files changed, 490 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 436f9b2..8d44814 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -162,8 +162,12 @@ public final class Coordinator { Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData; offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size()); for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) - offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, "")); - OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, offsetData); + offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), "")); + OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, + this.generation, + this.consumerId, + OffsetCommitRequest.DEFAULT_RETENTION_TIME, + offsetData); // send request and possibly wait for response if it is blocking RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 101f382..9c4518e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -89,31 +89,24 @@ public class Protocol { /* Produce api */ public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING), - new Field("data", - new ArrayOf(new Schema(new Field("partition", - INT32), - new Field("record_set", - BYTES))))); + new Field("data", new ArrayOf(new Schema(new Field("partition", INT32), + new Field("record_set", BYTES))))); public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks", - INT16, - "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."), - new Field("timeout", - INT32, - "The time to await a response in ms."), - new Field("topic_data", - new ArrayOf(TOPIC_PRODUCE_DATA_V0))); + INT16, + "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."), + new Field("timeout", INT32, "The time to await a response in ms."), + new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0))); public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses", - new ArrayOf(new Schema(new Field("topic", - STRING), - new Field("partition_responses", - new ArrayOf(new Schema(new Field("partition", - INT32), - new Field("error_code", - INT16), - new Field("base_offset", - INT64)))))))); + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_responses", + new ArrayOf(new Schema(new Field("partition", + INT32), + new Field("error_code", + INT16), + new Field("base_offset", + INT64)))))))); public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0}; public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0}; @@ -132,6 +125,16 @@ public class Protocol { STRING, "Any associated metadata the client wants to keep.")); + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to commit."), @@ -139,6 +142,13 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), "Partitions to commit offsets.")); + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), + "Partitions to commit offsets.")); + public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), @@ -159,10 +169,27 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets.")); + public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("retention_time", + INT64, + "Time period in ms to retain the offset."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), + "Topics to commit offsets.")); + public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), - new Field("error_code", INT16)); + new Field("error_code", + INT16)); public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), new Field("partition_responses", @@ -171,9 +198,9 @@ public class Protocol { public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1}; - /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */ - public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; + public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2}; + /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */ + public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; /* Offset fetch api */ public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 7672a3a..92de6a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -52,7 +52,7 @@ public class Struct { else if (field.defaultValue != Field.NO_DEFAULT) return field.defaultValue; else - throw new SchemaException("Missing value for field '" + field.name + " which has no default value."); + throw new SchemaException("Missing value for field '" + field.name + "' which has no default value."); } /** @@ -191,7 +191,7 @@ public class Struct { ArrayOf array = (ArrayOf) field.type(); return new Struct((Schema) array.type()); } else { - throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type()); + throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type()); } } @@ -234,7 +234,7 @@ public class Struct { */ private void validateField(Field field) { if (this.schema != field.schema) - throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance."); + throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance."); if (field.index > values.length) throw new SchemaException("Invalid field index: " + field.index); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 94e9d37..b92f670 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -29,12 +29,12 @@ import org.apache.kafka.common.utils.CollectionUtils; * This wrapper supports both v0 and v1 of OffsetCommitRequest. */ public class OffsetCommitRequest extends AbstractRequestResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; private static final String TOPICS_KEY_NAME = "topics"; + private static final String RETENTION_TIME_KEY_NAME = "retention_time"; // topic level field names private static final String TOPIC_KEY_NAME = "topic"; @@ -43,27 +43,44 @@ public class OffsetCommitRequest extends AbstractRequestResponse { // partition level field names private static final String PARTITION_KEY_NAME = "partition"; private static final String COMMIT_OFFSET_KEY_NAME = "offset"; - private static final String TIMESTAMP_KEY_NAME = "timestamp"; private static final String METADATA_KEY_NAME = "metadata"; + @Deprecated + private static final String TIMESTAMP_KEY_NAME = "timestamp"; // for v0, v1 + + // default values for the current version public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; + public static final long DEFAULT_RETENTION_TIME = -1L; + + // default values for old versions, + // will be removed after these versions are deprecated + @Deprecated + public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1 private final String groupId; - private final int generationId; private final String consumerId; + private final int generationId; + private final long retentionTime; private final Map<TopicPartition, PartitionData> offsetData; public static final class PartitionData { + @Deprecated + public final long timestamp; // for V0, V1 + public final long offset; - public final long timestamp; public final String metadata; + @Deprecated public PartitionData(long offset, long timestamp, String metadata) { this.offset = offset; this.timestamp = timestamp; this.metadata = metadata; } + + public PartitionData(long offset, String metadata) { + this(offset, DEFAULT_TIMESTAMP, metadata); + } } /** @@ -78,6 +95,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; this.consumerId = DEFAULT_CONSUMER_ID; + this.retentionTime = DEFAULT_RETENTION_TIME; this.offsetData = offsetData; } @@ -88,15 +106,39 @@ public class OffsetCommitRequest extends AbstractRequestResponse { * @param consumerId * @param offsetData */ + @Deprecated public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1))); + + initCommonFields(groupId, offsetData); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + this.groupId = groupId; + this.generationId = generationId; + this.consumerId = consumerId; + this.retentionTime = DEFAULT_RETENTION_TIME; + this.offsetData = offsetData; + } + + /** + * Constructor for version 2. + * @param groupId + * @param generationId + * @param consumerId + * @param retentionTime + * @param offsetData + */ + public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) { super(new Struct(CURRENT_SCHEMA)); initCommonFields(groupId, offsetData); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(RETENTION_TIME_KEY_NAME, retentionTime); this.groupId = groupId; this.generationId = generationId; this.consumerId = consumerId; + this.retentionTime = retentionTime; this.offsetData = offsetData; } @@ -105,7 +147,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse { struct.set(GROUP_ID_KEY_NAME, groupId); List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) { + + for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); List<Struct> partitionArray = new ArrayList<Struct>(); @@ -114,7 +157,9 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + // Only for v0 and v1 + if (partitionData.hasField(TIMESTAMP_KEY_NAME)) + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); partitionArray.add(partitionData); } @@ -126,20 +171,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public OffsetCommitRequest(Struct struct) { super(struct); - offsetData = new HashMap<TopicPartition, PartitionData>(); - for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.getInt(PARTITION_KEY_NAME); - long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); - long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); - String metadata = partitionResponse.getString(METADATA_KEY_NAME); - PartitionData partitionData = new PartitionData(offset, timestamp, metadata); - offsetData.put(new TopicPartition(topic, partition), partitionData); - } - } + groupId = struct.getString(GROUP_ID_KEY_NAME); // This field only exists in v1. if (struct.hasField(GENERATION_ID_KEY_NAME)) @@ -152,6 +184,33 @@ public class OffsetCommitRequest extends AbstractRequestResponse { consumerId = struct.getString(CONSUMER_ID_KEY_NAME); else consumerId = DEFAULT_CONSUMER_ID; + + // This field only exists in v2 + if (struct.hasField(RETENTION_TIME_KEY_NAME)) + retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME); + else + retentionTime = DEFAULT_RETENTION_TIME; + + offsetData = new HashMap<TopicPartition, PartitionData>(); + for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) { + Struct topicData = (Struct) topicDataObj; + String topic = topicData.getString(TOPIC_KEY_NAME); + for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionDataStruct = (Struct) partitionDataObj; + int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME); + long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME); + String metadata = partitionDataStruct.getString(METADATA_KEY_NAME); + PartitionData partitionOffset; + // This field only exists in v0 and v1 + if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) { + long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME); + partitionOffset = new PartitionData(offset, timestamp, metadata); + } else { + partitionOffset = new PartitionData(offset, metadata); + } + offsetData.put(new TopicPartition(topic, partition), partitionOffset); + } + } } public String groupId() { @@ -166,6 +225,10 @@ public class OffsetCommitRequest extends AbstractRequestResponse { return consumerId; } + public long retentionTime() { + return retentionTime; + } + public Map<TopicPartition, PartitionData> offsetData() { return offsetData; } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 13237fd..61a767a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -140,8 +140,8 @@ public class RequestResponseTest { private AbstractRequestResponse createOffsetCommitRequest() { Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(); - commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, "")); - return new OffsetCommitRequest("group1", 100, "consumer1", commitData); + commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); + return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData); } private AbstractRequestResponse createOffsetCommitResponse() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 050615c..cf8e6ac 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -21,21 +21,19 @@ import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.utils.{SystemTime, Logging} import kafka.network.{RequestChannel, BoundedByteBufferSend} -import kafka.common.{OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import kafka.network.RequestChannel.Response import scala.collection._ object OffsetCommitRequest extends Logging { - val CurrentVersion: Short = 1 + val CurrentVersion: Short = 2 val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { - val now = SystemTime.milliseconds - // Read values from the envelope val versionId = buffer.getShort - assert(versionId == 0 || versionId == 1, - "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") + assert(versionId == 0 || versionId == 1 || versionId == 2, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.") val correlationId = buffer.getInt val clientId = readShortString(buffer) @@ -43,13 +41,25 @@ object OffsetCommitRequest extends Logging { // Read the OffsetRequest val consumerGroupId = readShortString(buffer) - // version 1 specific fields - var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID - var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID - if (versionId == 1) { - groupGenerationId = buffer.getInt - consumerId = readShortString(buffer) - } + // version 1 and 2 specific fields + val groupGenerationId: Int = + if (versionId >= 1) + buffer.getInt + else + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID + + val consumerId: String = + if (versionId >= 1) + readShortString(buffer) + else + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID + + // version 2 specific fields + val retentionMs: Long = + if (versionId >= 2) + buffer.getLong + else + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { @@ -59,14 +69,18 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - val given = buffer.getLong - if (given == -1L) now else given + if (versionId <= 1) + buffer.getLong + else + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP } val metadata = readShortString(buffer) + (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) }) }) - OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId) + + OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs) } } @@ -76,11 +90,12 @@ case class OffsetCommitRequest(groupId: String, correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId, groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, - consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID) + consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID, + retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { - assert(versionId == 0 || versionId == 1, - "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") + assert(versionId == 0 || versionId == 1 || versionId == 2, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) @@ -93,11 +108,17 @@ case class OffsetCommitRequest(groupId: String, // Write OffsetCommitRequest writeShortString(buffer, groupId) // consumer group - // version 1 specific data - if (versionId == 1) { + // version 1 and 2 specific data + if (versionId >= 1) { buffer.putInt(groupGenerationId) writeShortString(buffer, consumerId) } + + // version 2 or above specific data + if (versionId >= 2) { + buffer.putLong(retentionMs) + } + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError] writeShortString(buffer, t1._1) // topic @@ -105,7 +126,9 @@ case class OffsetCommitRequest(groupId: String, t1._2.foreach( t2 => { buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) - buffer.putLong(t2._2.timestamp) + // version 0 and 1 specific data + if (versionId <= 1) + buffer.putLong(t2._2.commitTimestamp) writeShortString(buffer, t2._2.metadata) }) }) @@ -116,7 +139,8 @@ case class OffsetCommitRequest(groupId: String, 4 + /* correlationId */ shortStringLength(clientId) + shortStringLength(groupId) + - (if (versionId == 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + + (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + + (if (versionId >= 2) 8 /* retention time */ else 0) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets @@ -127,7 +151,7 @@ case class OffsetCommitRequest(groupId: String, innerCount + 4 /* partition */ + 8 /* offset */ + - 8 /* timestamp */ + + (if (versionId <= 1) 8 else 0) /* timestamp */ + shortStringLength(offsetAndMetadata._2.metadata) }) }) @@ -149,6 +173,7 @@ case class OffsetCommitRequest(groupId: String, offsetCommitRequest.append("; GroupId: " + groupId) offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId) offsetCommitRequest.append("; ConsumerId: " + consumerId) + offsetCommitRequest.append("; RetentionMs: " + retentionMs) if(details) offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(",")) offsetCommitRequest.toString() http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index c7604b9..67811a7 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -17,13 +17,17 @@ package kafka.api -import java.nio.ByteBuffer - import kafka.api.ApiUtils._ import kafka.utils.Logging import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common._ +import kafka.common.TopicAndPartition import kafka.network.RequestChannel.Response -import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition} + +import scala.Some + +import java.nio.ByteBuffer + object OffsetFetchRequest extends Logging { val CurrentVersion: Short = 0 val DefaultClientId = "" @@ -91,8 +95,7 @@ case class OffsetFetchRequest(groupId: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val responseMap = requestInfo.map { case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError( - offset = OffsetAndMetadata.InvalidOffset, - error = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) )) }.toMap val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId) http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 1584a92..139913f 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -17,40 +17,60 @@ package kafka.common -case class OffsetAndMetadata(offset: Long, - metadata: String = OffsetAndMetadata.NoMetadata, - timestamp: Long = -1L) { - override def toString = "OffsetAndMetadata[%d,%s%s]" - .format(offset, - if (metadata != null && metadata.length > 0) metadata else "NO_METADATA", - if (timestamp == -1) "" else "," + timestamp.toString) +case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) { + override def toString = "OffsetMetadata[%d,%s]" + .format(offset, + if (metadata != null && metadata.length > 0) metadata else "NO_METADATA") } -object OffsetAndMetadata { +object OffsetMetadata { val InvalidOffset: Long = -1L val NoMetadata: String = "" - val InvalidTime: Long = -1L + + val InvalidOffsetMetadata = OffsetMetadata(OffsetMetadata.InvalidOffset, OffsetMetadata.NoMetadata) +} + +case class OffsetAndMetadata(offsetMetadata: OffsetMetadata, + commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP, + expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) { + + def offset() = offsetMetadata.offset + + def metadata() = offsetMetadata.metadata + + override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp) } -case class OffsetMetadataAndError(offset: Long, - metadata: String = OffsetAndMetadata.NoMetadata, - error: Short = ErrorMapping.NoError) { +object OffsetAndMetadata { + def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp) + + def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp) + + def apply(offset: Long, metadata: String) = new OffsetAndMetadata(OffsetMetadata(offset, metadata)) - def this(offsetMetadata: OffsetAndMetadata, error: Short) = - this(offsetMetadata.offset, offsetMetadata.metadata, error) + def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata)) +} - def this(error: Short) = - this(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, error) +case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) { + def offset = offsetMetadata.offset - def asTuple = (offset, metadata, error) + def metadata = offsetMetadata.metadata - override def toString = "OffsetMetadataAndError[%d,%s,%d]".format(offset, metadata, error) + override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error) } object OffsetMetadataAndError { - val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoOffsetsCommittedCode) - val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode) - val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode) - val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode) + val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError) + val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode) + val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode) + val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode) + + def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError) + + def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error) + + def apply(offset: Long, metadata: String, error: Short) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error) } + + http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index cca815a..b1cf0db 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -400,7 +400,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 offsetString match { - case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)) + case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong)) case None => (topicPartition, OffsetMetadataAndError.NoOffset) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35af98f..c33e848 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -149,12 +149,44 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + // compute the retention time based on the request version: + // if it is before v2 or not specified by user, we can use the default retention + val offsetRetention = + if (offsetCommitRequest.versionId <= 1 || + offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) { + offsetManager.config.offsetsRetentionMs + } else { + offsetCommitRequest.retentionMs + } + + // commit timestamp is always set to now. + // "default" expiration timestamp is now + retention (and retention may be overridden if v2) + // expire timestamp is computed differently for v1 and v2. + // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp. + // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp + // - If v2 we use the default expiration timestamp + val currentTimestamp = SystemTime.milliseconds + val defaultExpireTimestamp = offsetRetention + currentTimestamp + + val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => + offsetAndMetadata.copy( + commitTimestamp = currentTimestamp, + expireTimestamp = { + if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + defaultExpireTimestamp + else + offsetRetention + offsetAndMetadata.commitTimestamp + } + ) + ) + // call offset manager to store offsets offsetManager.storeOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, - offsetCommitRequest.requestInfo, + offsetData, sendResponseCallback) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 46d21c7..422451a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -336,7 +336,7 @@ object KafkaConfig { val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)" val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits" - val OffsetsRetentionMinutesDoc = "Offsets older than this retention period will be discarded" + val OffsetsRetentionMinutesDoc = "Log retention window in minutes for offsets topic" val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for stale offsets" val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dddef93..4db3384 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -402,6 +402,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions = config.offsetsTopicPartitions, offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index d05e14d..395b1db 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient - /** * Configuration settings for in-built offset management * @param maxMetadataSize The maximum allowed metadata for any offset commit. @@ -62,7 +61,7 @@ import org.I0Itec.zkclient.ZkClient */ case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.DefaultMaxMetadataSize, loadBufferSize: Int = OffsetManagerConfig.DefaultLoadBufferSize, - offsetsRetentionMs: Long = 24*60*60000L, + offsetsRetentionMs: Long = OffsetManagerConfig.DefaultOffsetRetentionMs, offsetsRetentionCheckIntervalMs: Long = OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions: Int = OffsetManagerConfig.DefaultOffsetsTopicNumPartitions, offsetsTopicSegmentBytes: Int = OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes, @@ -74,6 +73,7 @@ case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.Defaul object OffsetManagerConfig { val DefaultMaxMetadataSize = 4096 val DefaultLoadBufferSize = 5*1024*1024 + val DefaultOffsetRetentionMs = 24*60*60*1000L val DefaultOffsetsRetentionCheckIntervalMs = 600000L val DefaultOffsetsTopicNumPartitions = 50 val DefaultOffsetsTopicSegmentBytes = 100*1024*1024 @@ -120,9 +120,11 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Compacting offsets cache.") val startMs = SystemTime.milliseconds - val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs) + val staleOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => + offsetAndMetadata.expireTimestamp < startMs + } - debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs)) + debug("Found %d expired offsets.".format(staleOffsets.size)) // delete the stale offsets from the table and generate tombstone messages to remove them from the log val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) => @@ -380,8 +382,17 @@ class OffsetManager(val config: OffsetManagerConfig, else trace("Ignoring redundant tombstone for %s.".format(key)) } else { + // special handling for version 0: + // set the expiration time stamp as commit time stamp + server default retention time val value = OffsetManager.readMessageValue(msgAndOffset.message.payload) - putOffset(key, value) + putOffset(key, value.copy ( + expireTimestamp = { + if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + value.commitTimestamp + config.offsetsRetentionMs + else + value.expireTimestamp + } + )) trace("Loaded offset %s for %s.".format(value, key)) } currOffset = msgAndOffset.nextOffset @@ -446,7 +457,7 @@ object OffsetManager { private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema) - private val CURRENT_OFFSET_SCHEMA_VERSION = 0.toShort + private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING), new Field("topic", STRING), @@ -458,12 +469,24 @@ object OffsetManager { private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), new Field("metadata", STRING, "Associated metadata.", ""), new Field("timestamp", INT64)) - private val VALUE_OFFSET_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val VALUE_METADATA_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val VALUE_TIMESTAMP_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") + + private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), + new Field("metadata", STRING, "Associated metadata.", ""), + new Field("commit_timestamp", INT64), + new Field("expire_timestamp", INT64)) + + private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") + private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") + private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") + + private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") + private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") + private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") + private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") // map of versions to schemas - private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0)) + private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0), + 1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1)) private val CURRENT_SCHEMA = schemaFor(CURRENT_OFFSET_SCHEMA_VERSION) @@ -480,7 +503,7 @@ object OffsetManager { * * @return key for offset commit message */ - def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = { + private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = { val key = new Struct(CURRENT_SCHEMA.keySchema) key.set(KEY_GROUP_FIELD, group) key.set(KEY_TOPIC_FIELD, topic) @@ -498,12 +521,13 @@ object OffsetManager { * @param offsetAndMetadata consumer's current offset and metadata * @return payload for offset commit message */ - def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + // generate commit value with schema version 1 val value = new Struct(CURRENT_SCHEMA.valueSchema) - value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset) - value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata) - value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp) - + value.set(VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset) + value.set(VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) + value.set(VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) + value.set(VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp) val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION) value.writeTo(byteBuffer) @@ -516,7 +540,7 @@ object OffsetManager { * @param buffer input byte-buffer * @return an GroupTopicPartition object */ - def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = { + private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = { val version = buffer.getShort() val keySchema = schemaFor(version).keySchema val key = keySchema.read(buffer).asInstanceOf[Struct] @@ -534,19 +558,40 @@ object OffsetManager { * @param buffer input byte-buffer * @return an offset-metadata object from the message */ - def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = { - if(buffer == null) { // tombstone + private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = { + val structAndVersion = readMessageValueStruct(buffer) + + if (structAndVersion.value == null) { // tombstone null } else { + if (structAndVersion.version == 0) { + val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long] + val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String] + val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long] + + OffsetAndMetadata(offset, metadata, timestamp) + } else if (structAndVersion.version == 1) { + val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long] + val metadata = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[String] + val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long] + val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] + + OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) + } else { + throw new IllegalStateException("Unknown offset message version") + } + } + } + + private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = { + if(buffer == null) { // tombstone + MessageValueStructAndVersion(null, -1) + } else { val version = buffer.getShort() val valueSchema = schemaFor(version).valueSchema val value = valueSchema.read(buffer).asInstanceOf[Struct] - val offset = value.get(VALUE_OFFSET_FIELD).asInstanceOf[Long] - val metadata = value.get(VALUE_METADATA_FIELD).asInstanceOf[String] - val timestamp = value.get(VALUE_TIMESTAMP_FIELD).asInstanceOf[Long] - - OffsetAndMetadata(offset, metadata, timestamp) + MessageValueStructAndVersion(value, version) } } @@ -555,7 +600,7 @@ object OffsetManager { class OffsetsMessageFormatter extends MessageFormatter { def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { val formattedKey = if (key == null) "NULL" else OffsetManager.readMessageKey(ByteBuffer.wrap(key)).toString - val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValue(ByteBuffer.wrap(value)).toString + val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValueStruct(ByteBuffer.wrap(value)).value.toString output.write(formattedKey.getBytes) output.write("::".getBytes) output.write(formattedValue.getBytes) @@ -565,6 +610,8 @@ object OffsetManager { } +case class MessageValueStructAndVersion(value: Struct, version: Short) + case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) { def this(group: String, topic: String, partition: Int) = http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c527482..44f0026 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -65,7 +65,7 @@ object ReplicaManager { } class ReplicaManager(val config: KafkaConfig, - time: Time, + private val time: Time, val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index fba852a..4cb803f 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -17,20 +17,21 @@ package kafka.api -import org.junit._ -import org.scalatest.junit.JUnitSuite -import junit.framework.Assert._ -import java.nio.ByteBuffer -import kafka.message.{Message, ByteBufferMessageSet} +import kafka.common._ import kafka.cluster.Broker -import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.SystemTime + +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests._ -import org.apache.kafka.common.protocol.ApiKeys + import scala.Some -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.common.TopicAndPartition -import org.apache.kafka.common.TopicPartition +import java.nio.ByteBuffer + +import org.junit._ +import org.scalatest.junit.JUnitSuite +import junit.framework.Assert._ object SerializationTestUtils { @@ -151,10 +152,23 @@ object SerializationTestUtils { new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) } + def createTestOffsetCommitRequestV2: OffsetCommitRequest = { + new OffsetCommitRequest( + groupId = "group 1", + retentionMs = SystemTime.milliseconds, + requestInfo=collection.immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata) + )) + } + def createTestOffsetCommitRequestV1: OffsetCommitRequest = { - new OffsetCommitRequest("group 1", collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) + new OffsetCommitRequest( + versionId = 1, + groupId = "group 1", + requestInfo = collection.immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds) )) } @@ -163,8 +177,8 @@ object SerializationTestUtils { versionId = 0, groupId = "group 1", requestInfo = collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds) )) } @@ -183,7 +197,7 @@ object SerializationTestUtils { def createTestOffsetFetchResponse: OffsetFetchResponse = { new OffsetFetchResponse(collection.immutable.Map( TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError), - TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode) + TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode) )) } @@ -232,6 +246,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0 private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1 + private val offsetCommitRequestV2 = SerializationTestUtils.createTestOffsetCommitRequestV2 private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse @@ -250,7 +265,8 @@ class RequestResponseSerializationTest extends JUnitSuite { collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest, stopReplicaResponse, producerRequest, producerResponse, fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest, - topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, + topicMetadataResponse, + offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator, heartbeatRequest, http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index e4d0435..7654275 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -17,28 +17,33 @@ package kafka.server -import java.io.File -import kafka.utils._ -import junit.framework.Assert._ -import java.util.Properties +import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.consumer.SimpleConsumer -import org.junit.{After, Before, Test} +import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.utils._ +import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness + +import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite -import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} -import kafka.utils.TestUtils._ -import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} + +import java.util.Properties +import java.io.File + import scala.util.Random import scala.collection._ +import junit.framework.Assert._ + class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val random: Random = new Random() + val brokerPort: Int = 9099 + val group = "test-group" + val retentionCheckInterval: Long = 100L var logDir: File = null var topicLogDir: File = null var server: KafkaServer = null var logSize: Int = 100 - val brokerPort: Int = 9099 - val group = "test-group" var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() @@ -46,6 +51,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() val config: Properties = createBrokerConfig(1, brokerPort) + config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") + config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() @@ -89,7 +96,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata) + assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata) assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset) // Commit a new offset @@ -155,31 +162,37 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) - assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error) + + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) - assertEquals(ErrorMapping.NoOffsetsCommittedCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) + + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get) + + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata) assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata) assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata) - assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata) + + assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata) + assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata) + assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata) + assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata) assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset) assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset) assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset) assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset) - assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) - assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) - assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) + + assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) + assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) + assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) } @Test @@ -204,6 +217,73 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) + } + + @Test + def testOffsetExpiration() { + // set up topic partition + val topic = "topic" + val topicPartition = TopicAndPartition(topic, 0) + createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1) + + val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) + + // v0 version commit request with commit timestamp set to -1 + // should not expire + val commitRequest0 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata", -1L)), + versionId = 0 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v1 version commit request with commit timestamp set to -1 + // should not expire + val commitRequest1 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata", -1L)), + versionId = 1 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v1 version commit request with commit timestamp set to now - two days + // should expire + val commitRequest2 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", SystemTime.milliseconds - 2*24*60*60*1000L)), + versionId = 1 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v2 version commit request with retention time set to 1 hour + // should not expire + val commitRequest3 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(4L, "metadata", -1L)), + versionId = 2, + retentionMs = 1000 * 60 * 60L + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v2 version commit request with retention time set to 0 second + // should expire + val commitRequest4 = OffsetCommitRequest( + groupId = "test-group", + requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata", -1L)), + versionId = 2, + retentionMs = 0L + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index b46daa4..71317eb 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -163,10 +163,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.shutdown() server.awaitShutdown() server.shutdown() - assertTrue(true); + assertTrue(true) } catch{ - case ex => fail() + case ex: Throwable => fail() } } }
