Repository: kafka Updated Branches: refs/heads/trunk 70a784b64 -> 8de62253a
HOTFIX: Fix incorrect version used for group metadata version Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #424 from hachikuji/hotfix-metadata-storage Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8de62253 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8de62253 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8de62253 Branch: refs/heads/trunk Commit: 8de62253adb4c6b90badbf92881c0402068cd65c Parents: 70a784b Author: Jason Gustafson <[email protected]> Authored: Wed Nov 4 15:02:32 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 4 15:02:32 2015 -0800 ---------------------------------------------------------------------- .../scala/kafka/coordinator/GroupMetadataManager.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8de62253/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 0052b5d..0c8333f 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -673,12 +673,14 @@ object GroupMetadataManager { private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members") // map of versions to key schemas as data types - private val MESSAGE_TYPE_SCHEMAS = Map(0 -> OFFSET_COMMIT_KEY_SCHEMA, + private val MESSAGE_TYPE_SCHEMAS = Map( + 0 -> OFFSET_COMMIT_KEY_SCHEMA, 1 -> OFFSET_COMMIT_KEY_SCHEMA, 2 -> GROUP_METADATA_KEY_SCHEMA) // map of version of offset value schemas - private val OFFSET_VALUE_SCHEMAS = Map(0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0, + private val OFFSET_VALUE_SCHEMAS = Map( + 0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0, 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1) private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort @@ -712,7 +714,7 @@ object GroupMetadataManager { val schemaOpt = GROUP_VALUE_SCHEMAS.get(version) schemaOpt match { case Some(schema) => schema - case _ => throw new KafkaException("Unknown offset schema version " + version) + case _ => throw new KafkaException("Unknown group metadata version " + version) } } @@ -762,7 +764,7 @@ object GroupMetadataManager { value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp) val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) - byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) value.writeTo(byteBuffer) byteBuffer.array() } @@ -804,7 +806,7 @@ object GroupMetadataManager { value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray) val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) - byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION) value.writeTo(byteBuffer) byteBuffer.array() }
