Repository: kafka Updated Branches: refs/heads/trunk dbcbd7920 -> 7565dcd8b
KAFKA-4861; GroupMetadataManager record is rejected if broker configured with LogAppendTime The record should be created with CreateTime (like in the producer). The conversion to LogAppendTime happens automatically (if necessary). Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2657 from ijuma/kafka-4861-log-append-time-breaks-group-data-manager Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7565dcd8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7565dcd8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7565dcd8 Branch: refs/heads/trunk Commit: 7565dcd8b0547f91a5d9d19771d9cd6693079d01 Parents: dbcbd79 Author: Ismael Juma <ism...@juma.me.uk> Authored: Thu Mar 9 16:45:41 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Mar 9 16:45:41 2017 -0800 ---------------------------------------------------------------------- .../coordinator/GroupMetadataManager.scala | 33 ++++---- .../src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../kafka/api/BaseConsumerTest.scala | 2 +- .../kafka/api/ConsumerBounceTest.scala | 3 +- .../kafka/api/FixedPortTestUtils.scala | 9 +- .../kafka/api/IntegrationTestHarness.scala | 2 +- .../kafka/api/LogAppendTimeTest.scala | 86 ++++++++++++++++++++ .../GroupCoordinatorResponseTest.scala | 12 +-- .../coordinator/GroupMetadataManagerTest.scala | 16 ++-- .../integration/KafkaServerTestHarness.scala | 4 +- 11 files changed, 128 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index a6ed6a9..d48328d 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -133,8 +133,8 @@ class GroupMetadataManager(val brokerId: Int, def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit): Option[DelayedStore] = { - getMagicAndTimestamp(partitionFor(group.groupId)) match { - case Some((magicValue, timestampType, timestamp)) => + getMagic(partitionFor(group.groupId)) match { + case Some(magicValue) => val groupMetadataValueVersion = { if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort @@ -142,6 +142,9 @@ class GroupMetadataManager(val brokerId: Int, GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION } + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() val record = Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)) @@ -231,8 +234,11 @@ class GroupMetadataManager(val brokerId: Int, } // construct the message set to append - getMagicAndTimestamp(partitionFor(group.groupId)) match { - case Some((magicValue, timestampType, timestamp)) => + getMagic(partitionFor(group.groupId)) match { + case Some(magicValue) => + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) => Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition), @@ -575,8 +581,12 @@ class GroupMetadataManager(val brokerId: Int, val offsetsPartition = partitionFor(groupId) val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) - getMagicAndTimestamp(offsetsPartition) match { - case Some((magicValue, timestampType, timestamp)) => + getMagic(offsetsPartition) match { + case Some(magicValue) => + // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + val timestampType = TimestampType.CREATE_TIME + val timestamp = time.milliseconds() + val partitionOpt = replicaManager.getPartition(appendPartition) partitionOpt.foreach { partition => val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata) => @@ -652,15 +662,10 @@ class GroupMetadataManager(val brokerId: Int, * Check if the replica is local and return the message format version and timestamp * * @param partition Partition of GroupMetadataTopic - * @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise + * @return Some(MessageFormatVersion) if replica is local, None otherwise */ - private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partition) - replicaManager.getMagicAndTimestampType(groupMetadataTopicPartition).map { case (messageFormatVersion, timestampType) => - val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds() - (messageFormatVersion, timestampType, timestamp) - } - } + private def getMagic(partition: Int): Option[Byte] = + replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition)) /** * Add the partition into the owned list http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fa5afe5..24a224a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -470,7 +470,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Please note that if the message format is changed from a higher version back to lower version this // test might break because some messages in new message format can be delivered to consumers before 0.10.0.0 // without format down conversion. - val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1 > Record.MAGIC_VALUE_V0) && + val convertedData = if (versionId <= 1 && replicaManager.getMagic(tp).exists(_ > Record.MAGIC_VALUE_V0) && !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) { trace(s"Down converting message to V0 for fetch request from $clientId") val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE) http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1cec4a2..4ab8c2a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -618,10 +618,8 @@ class ReplicaManager(val config: KafkaConfig, quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync } - def getMagicAndTimestampType(topicPartition: TopicPartition): Option[(Byte, TimestampType)] = - getReplica(topicPartition).flatMap { replica => - replica.log.map(log => (log.config.messageFormatVersion.messageFormatVersion, log.config.messageTimestampType)) - } + def getMagic(topicPartition: TopicPartition): Option[Byte] = + getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion)) def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] = { replicaStateChangeLock synchronized { http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 802bab8..27b89d5 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.errors.WakeupException /** * Integration tests for the new consumer that cover basic usage as well as server failures */ -abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { +abstract class BaseConsumerTest extends IntegrationTestHarness { val epsilon = 0.1 val producerCount = 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index f0e0c9e..4ec77a1 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -46,7 +46,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { val executor = Executors.newScheduledThreadPool(2) // configure the servers and clients - this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout @@ -59,7 +58,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") override def generateConfigs() = { - FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect,enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(serverCount, zkConnect, enableControlledShutdown = false) .map(KafkaConfig.fromProps(_, serverConfig)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala index d15a01d..bf5f8c1 100644 --- a/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala +++ b/core/src/test/scala/integration/kafka/api/FixedPortTestUtils.scala @@ -34,9 +34,7 @@ object FixedPortTestUtils { sockets.foreach(_.close()) ports } catch { - case e: IOException => { - throw new RuntimeException(e) - } + case e: IOException => throw new RuntimeException(e) } } @@ -45,8 +43,9 @@ object FixedPortTestUtils { enableControlledShutdown: Boolean = true, enableDeleteTopic: Boolean = false): Seq[Properties] = { val ports = FixedPortTestUtils.choosePorts(numConfigs) - (0 until numConfigs) - .map(node => TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node))) + (0 until numConfigs).map { node => + TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, ports(node)) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 46465e8..5c8ceea 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -45,7 +45,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - override def generateConfigs() = { + override def generateConfigs = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach { config => http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala new file mode 100644 index 0000000..4a97bea --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import java.util.Collections +import java.util.concurrent.TimeUnit + +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.record.TimestampType +import org.junit.{Before, Test} +import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +/** + * Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic + * level configs, see the *ProducerSendTest classes. + */ +class LogAppendTimeTest extends IntegrationTestHarness { + val producerCount: Int = 1 + val consumerCount: Int = 1 + val serverCount: Int = 2 + + // This will be used for the offsets topic as well + serverConfig.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.name) + serverConfig.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") + + private val topic = "topic" + + @Before + override def setUp() { + super.setUp() + TestUtils.createTopic(zkUtils, topic, servers = servers) + } + + @Test + def testProduceConsume() { + val producer = producers.head + val now = System.currentTimeMillis() + val createTime = now - TimeUnit.DAYS.toMillis(1) + val producerRecords = (1 to 10).map(i => new ProducerRecord(topic, null, createTime, s"key$i".getBytes, + s"value$i".getBytes)) + val recordMetadatas = producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) + recordMetadatas.foreach { recordMetadata => + assertTrue(recordMetadata.timestamp >= now) + assertTrue(recordMetadata.timestamp < now + TimeUnit.SECONDS.toMillis(60)) + } + + val consumer = consumers.head + consumer.subscribe(Collections.singleton(topic)) + val consumerRecords = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] + TestUtils.waitUntilTrue(() => { + consumerRecords ++= consumer.poll(50).asScala + consumerRecords.size == producerRecords.size + }, s"Consumed ${consumerRecords.size} records until timeout instead of the expected ${producerRecords.size} records") + + consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) => + val producerRecord = producerRecords(index) + val recordMetadata = recordMetadatas(index) + assertEquals(new String(producerRecord.key), new String(consumerRecord.key)) + assertEquals(new String(producerRecord.value), new String(consumerRecord.value)) + assertNotEquals(producerRecord.timestamp, consumerRecord.timestamp) + assertEquals(recordMetadata.timestamp, consumerRecord.timestamp) + assertEquals(TimestampType.LOG_APPEND_TIME, consumerRecord.timestampType) + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 7b40187..22cb899 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -305,8 +305,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) timer.advanceClock(DefaultSessionTimeout + 100) @@ -1055,8 +1054,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) @@ -1137,8 +1135,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { new PartitionResponse(Errors.NONE, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) @@ -1149,8 +1146,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val (responseFuture, responseCallback) = setupHeartbeatCallback EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)).anyTimes() + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(Record.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 86189aa..8cfae8d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -321,7 +321,7 @@ class GroupMetadataManagerTest { @Test def testStoreNonEmptyGroupWhenCoordinatorHasMoved() { - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None) val memberId = "memberId" val clientId = "clientId" val clientHost = "localhost" @@ -390,7 +390,7 @@ class GroupMetadataManagerTest { @Test def testCommitOffsetWhenCoordinatorHasMoved() { - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())).andReturn(None) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None) val memberId = "" val generationId = -1 val topicPartition = new TopicPartition("foo", 0) @@ -538,8 +538,7 @@ class GroupMetadataManagerTest { EasyMock.reset(partition) val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1)) EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -584,8 +583,7 @@ class GroupMetadataManagerTest { EasyMock.reset(partition) val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.LOG_APPEND_TIME)) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1)) EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -602,7 +600,8 @@ class GroupMetadataManagerTest { assertTrue(metadataTombstone.hasKey) assertTrue(metadataTombstone.hasNullValue) assertEquals(Record.MAGIC_VALUE_V1, metadataTombstone.magic) - assertEquals(TimestampType.LOG_APPEND_TIME, metadataTombstone.timestampType) + // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. + assertEquals(TimestampType.CREATE_TIME, metadataTombstone.timestampType) assertTrue(metadataTombstone.timestamp > 0) val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey] @@ -762,8 +761,7 @@ class GroupMetadataManagerTest { new PartitionResponse(error, 0L, Record.NO_TIMESTAMP) ) )}) - EasyMock.expect(replicaManager.getMagicAndTimestampType(EasyMock.anyObject())) - .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME)) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(Record.MAGIC_VALUE_V1)) } private def buildStableGroupRecordWithMember(memberId: String): Record = { http://git-wip-us.apache.org/repos/asf/kafka/blob/7565dcd8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 6fa7ad5..9f40ec6 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -47,7 +47,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every * test and should not reuse previous configurations unless they select their ports randomly when servers are started. */ - def generateConfigs(): Seq[KafkaConfig] + def generateConfigs: Seq[KafkaConfig] /** * Override this in case ACLs or security credentials must be set before `servers` are started. @@ -64,7 +64,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { def configs: Seq[KafkaConfig] = { if (instanceConfigs == null) - instanceConfigs = generateConfigs() + instanceConfigs = generateConfigs instanceConfigs }