http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/ProducerIdMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerIdMapping.scala b/core/src/main/scala/kafka/log/ProducerIdMapping.scala new file mode 100644 index 0000000..a870b7d --- /dev/null +++ b/core/src/main/scala/kafka/log/ProducerIdMapping.scala @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log + +import java.io._ +import java.nio.ByteBuffer +import java.nio.file.Files + +import kafka.common.KafkaException +import kafka.utils.{Logging, nonthreadsafe} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{DuplicateSequenceNumberException, OutOfOrderSequenceException, ProducerFencedException} +import org.apache.kafka.common.protocol.types._ +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.utils.{ByteUtils, Crc32C} + +import scala.collection.{immutable, mutable} + +private[log] object ProducerIdEntry { + val Empty = ProducerIdEntry(RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + -1, 0, RecordBatch.NO_TIMESTAMP) +} + +private[log] case class ProducerIdEntry(epoch: Short, lastSeq: Int, lastOffset: Long, numRecords: Int, timestamp: Long) { + def firstSeq: Int = lastSeq - numRecords + 1 + def firstOffset: Long = lastOffset - numRecords + 1 + + def isDuplicate(batch: RecordBatch): Boolean = { + batch.producerEpoch == epoch && + batch.baseSequence == firstSeq && + batch.lastSequence == lastSeq + } +} + +private[log] class ProducerAppendInfo(val pid: Long, initialEntry: ProducerIdEntry) { + // the initialEntry here is the last successfull appended batch. we validate incoming entries transitively, starting + // with the last appended entry. + private var epoch = initialEntry.epoch + private var firstSeq = initialEntry.firstSeq + private var lastSeq = initialEntry.lastSeq + private var lastOffset = initialEntry.lastOffset + private var maxTimestamp = initialEntry.timestamp + + private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = { + if (this.epoch > epoch) { + throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer with a newer epoch. $epoch (request epoch), ${this.epoch} (server epoch)") + } else if (this.epoch == RecordBatch.NO_PRODUCER_EPOCH || this.epoch < epoch) { + if (firstSeq != 0) + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $epoch " + + s"(request epoch), $firstSeq (seq. number)") + } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) { + throw new DuplicateSequenceNumberException(s"Duplicate sequence number: $pid (pid), $firstSeq " + + s"(seq. number), ${this.firstSeq} (expected seq. number)") + } else if (firstSeq != this.lastSeq + 1L) { + throw new OutOfOrderSequenceException(s"Invalid sequence number: $pid (pid), $firstSeq " + + s"(seq. number), ${this.lastSeq} (expected seq. number)") + } + } + + def assignLastOffsetAndTimestamp(lastOffset: Long, lastTimestamp: Long): Unit = { + this.lastOffset = lastOffset + this.maxTimestamp = lastTimestamp + } + + private def append(epoch: Short, firstSeq: Int, lastSeq: Int, lastTimestamp: Long, lastOffset: Long) { + validateAppend(epoch, firstSeq, lastSeq) + this.epoch = epoch + this.firstSeq = firstSeq + this.lastSeq = lastSeq + this.maxTimestamp = lastTimestamp + this.lastOffset = lastOffset + } + + def append(batch: RecordBatch): Unit = + append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp, batch.lastOffset) + + def append(entry: ProducerIdEntry): Unit = + append(entry.epoch, entry.firstSeq, entry.lastSeq, entry.timestamp, entry.lastOffset) + + def lastEntry: ProducerIdEntry = + ProducerIdEntry(epoch, lastSeq, lastOffset, lastSeq - firstSeq + 1, maxTimestamp) +} + +private[log] class CorruptSnapshotException(msg: String) extends KafkaException(msg) + +object ProducerIdMapping { + private val DirnamePrefix = "pid-mapping" + private val FilenameSuffix = "snapshot" + private val FilenamePattern = s"^\\d{1,}.$FilenameSuffix".r + private val PidSnapshotVersion: Short = 1 + + private val VersionField = "version" + private val CrcField = "crc" + private val PidField = "pid" + private val LastSequenceField = "last_sequence" + private val EpochField = "epoch" + private val LastOffsetField = "last_offset" + private val NumRecordsField = "num_records" + private val TimestampField = "timestamp" + private val PidEntriesField = "pid_entries" + + private val VersionOffset = 0 + private val CrcOffset = VersionOffset + 2 + private val PidEntriesOffset = CrcOffset + 4 + + private val maxPidSnapshotsToRetain = 2 + + val PidSnapshotEntrySchema = new Schema( + new Field(PidField, Type.INT64, "The producer ID"), + new Field(EpochField, Type.INT16, "Current epoch of the producer"), + new Field(LastSequenceField, Type.INT32, "Last written sequence of the producer"), + new Field(LastOffsetField, Type.INT64, "Last written offset of the producer"), + new Field(NumRecordsField, Type.INT32, "The number of records written in the last log entry"), + new Field(TimestampField, Type.INT64, "Max timestamp from the last written entry")) + val PidSnapshotMapSchema = new Schema( + new Field(VersionField, Type.INT16, "Version of the snapshot file"), + new Field(CrcField, Type.UNSIGNED_INT32, "CRC of the snapshot data"), + new Field(PidEntriesField, new ArrayOf(PidSnapshotEntrySchema), "The entries in the PID table")) + + private def loadSnapshot(file: File, pidMap: mutable.Map[Long, ProducerIdEntry], + checkNotExpired: (ProducerIdEntry) => Boolean) { + val buffer = Files.readAllBytes(file.toPath) + val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer)) + + val version = struct.getShort(VersionField) + if (version != PidSnapshotVersion) + throw new IllegalArgumentException(s"Unhandled snapshot file version $version") + + val crc = struct.getUnsignedInt(CrcField) + val computedCrc = Crc32C.compute(buffer, PidEntriesOffset, buffer.length - PidEntriesOffset) + if (crc != computedCrc) + throw new CorruptSnapshotException(s"Snapshot file is corrupted (CRC is no longer valid). Stored crc: ${crc}. Computed crc: ${computedCrc}") + + struct.getArray(PidEntriesField).foreach { pidEntryObj => + val pidEntryStruct = pidEntryObj.asInstanceOf[Struct] + val pid = pidEntryStruct.getLong(PidField) + val epoch = pidEntryStruct.getShort(EpochField) + val seq = pidEntryStruct.getInt(LastSequenceField) + val offset = pidEntryStruct.getLong(LastOffsetField) + val timestamp = pidEntryStruct.getLong(TimestampField) + val numRecords = pidEntryStruct.getInt(NumRecordsField) + val newEntry = ProducerIdEntry(epoch, seq, offset, numRecords, timestamp) + if (checkNotExpired(newEntry)) + pidMap.put(pid, newEntry) + } + } + + private def writeSnapshot(file: File, entries: mutable.Map[Long, ProducerIdEntry]) { + val struct = new Struct(PidSnapshotMapSchema) + struct.set(VersionField, PidSnapshotVersion) + struct.set(CrcField, 0L) // we'll fill this after writing the entries + val entriesArray = entries.map { + case (pid, entry) => + val pidEntryStruct = struct.instance(PidEntriesField) + pidEntryStruct.set(PidField, pid) + .set(EpochField, entry.epoch) + .set(LastSequenceField, entry.lastSeq) + .set(LastOffsetField, entry.lastOffset) + .set(NumRecordsField, entry.numRecords) + .set(TimestampField, entry.timestamp) + pidEntryStruct + }.toArray + struct.set(PidEntriesField, entriesArray) + + val buffer = ByteBuffer.allocate(struct.sizeOf) + struct.writeTo(buffer) + buffer.flip() + + // now fill in the CRC + val crc = Crc32C.compute(buffer, PidEntriesOffset, buffer.limit - PidEntriesOffset) + ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc) + + val fos = new FileOutputStream(file) + try { + fos.write(buffer.array, buffer.arrayOffset, buffer.limit) + } finally { + fos.close() + } + } + + private def verifyFileName(name: String): Boolean = FilenamePattern.findFirstIn(name).isDefined + + private def offsetFromFile(file: File): Long = { + s"${file.getName.replace(s".$FilenameSuffix", "")}".toLong + } + + private def formatFileName(lastOffset: Long): String = { + // The files will be named '$lastOffset.snapshot' and located in 'logDir/pid-mapping' + s"$lastOffset.$FilenameSuffix" + } + +} + +/** + * Maintains a mapping from ProducerIds (PIDs) to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + */ +@nonthreadsafe +class ProducerIdMapping(val config: LogConfig, + val topicPartition: TopicPartition, + val snapParentDir: File, + val maxPidExpirationMs: Int) extends Logging { + import ProducerIdMapping._ + + val snapDir: File = new File(snapParentDir, DirnamePrefix) + snapDir.mkdir() + + private val pidMap = mutable.Map[Long, ProducerIdEntry]() + private var lastMapOffset = 0L + private var lastSnapOffset = 0L + + /** + * Returns the last offset of this map + */ + def mapEndOffset = lastMapOffset + + /** + * Get a copy of the active producers + */ + def activePids: immutable.Map[Long, ProducerIdEntry] = pidMap.toMap + + /** + * Load a snapshot of the id mapping or return empty maps + * in the case the snapshot doesn't exist (first time). + */ + private def loadFromSnapshot(logEndOffset: Long, checkNotExpired:(ProducerIdEntry) => Boolean) { + pidMap.clear() + + var loaded = false + while (!loaded) { + lastSnapshotFile(logEndOffset) match { + case Some(file) => + try { + loadSnapshot(file, pidMap, checkNotExpired) + lastSnapOffset = offsetFromFile(file) + lastMapOffset = lastSnapOffset + loaded = true + } catch { + case e: CorruptSnapshotException => + error(s"Snapshot file at ${file} is corrupt: ${e.getMessage}") + file.delete() + } + case None => + lastSnapOffset = 0L + lastMapOffset = 0L + snapDir.mkdir() + loaded = true + } + } + } + + def isEntryValid(currentTimeMs: Long, producerIdEntry: ProducerIdEntry) : Boolean = { + currentTimeMs - producerIdEntry.timestamp < maxPidExpirationMs + } + + def checkForExpiredPids(currentTimeMs: Long) { + pidMap.retain { case (pid, lastEntry) => + isEntryValid(currentTimeMs, lastEntry) + } + } + + def truncateAndReload(logEndOffset: Long, currentTime: Long) { + truncateSnapshotFiles(logEndOffset) + def checkNotExpired = (producerIdEntry: ProducerIdEntry) => { isEntryValid(currentTime, producerIdEntry) } + loadFromSnapshot(logEndOffset, checkNotExpired) + } + + /** + * Update the mapping with the given append information + */ + def update(appendInfo: ProducerAppendInfo): Unit = { + if (appendInfo.pid == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException("Invalid PID passed to update") + val entry = appendInfo.lastEntry + pidMap.put(appendInfo.pid, entry) + lastMapOffset = entry.lastOffset + 1 + } + + /** + * Load a previously stored PID entry into the cache. Ignore the entry if the timestamp is older + * than the current time minus the PID expiration time (i.e. if the PID has expired). + */ + def load(pid: Long, entry: ProducerIdEntry, currentTimeMs: Long) { + if (pid != RecordBatch.NO_PRODUCER_ID && currentTimeMs - entry.timestamp < maxPidExpirationMs) { + pidMap.put(pid, entry) + lastMapOffset = entry.lastOffset + 1 + } + } + + /** + * Get the last written entry for the given PID. + */ + def lastEntry(pid: Long): Option[ProducerIdEntry] = pidMap.get(pid) + + /** + * Serialize and write the bytes to a file. The file name is a concatenation of: + * - offset + * - a ".snapshot" suffix + * + * The snapshot files are located in the logDirectory, inside a 'pid-mapping' sub directory. + */ + def maybeTakeSnapshot() { + // If not a new offset, then it is not worth taking another snapshot + if (lastMapOffset > lastSnapOffset) { + val file = new File(snapDir, formatFileName(lastMapOffset)) + writeSnapshot(file, pidMap) + + // Update the last snap offset according to the serialized map + lastSnapOffset = lastMapOffset + + maybeRemove() + } + } + + /** + * When we remove the head of the log due to retention, we need to + * clean up the id map. This method takes the new start offset and + * expires all ids that have a smaller offset. + * + * @param startOffset New start offset for the log associated to + * this id map instance + */ + def cleanFrom(startOffset: Long) { + pidMap.retain((pid, entry) => entry.firstOffset >= startOffset) + if (pidMap.isEmpty) + lastMapOffset = -1L + } + + private def maybeRemove() { + val list = listSnapshotFiles() + if (list.size > maxPidSnapshotsToRetain) { + // Get file with the smallest offset + val toDelete = list.minBy(offsetFromFile) + // Delete the last + toDelete.delete() + } + } + + private def listSnapshotFiles(): List[File] = { + if (snapDir.exists && snapDir.isDirectory) + snapDir.listFiles.filter(f => f.isFile && verifyFileName(f.getName)).toList + else + List.empty[File] + } + + /** + * Returns the last valid snapshot with offset smaller than the base offset provided as + * a constructor parameter for loading. + */ + private def lastSnapshotFile(maxOffset: Long): Option[File] = { + val files = listSnapshotFiles() + if (files != null && files.nonEmpty) { + val targetOffset = files.foldLeft(0L) { (accOffset, file) => + val snapshotLastOffset = offsetFromFile(file) + if ((maxOffset >= snapshotLastOffset) && (snapshotLastOffset > accOffset)) + snapshotLastOffset + else + accOffset + } + val snap = new File(snapDir, formatFileName(targetOffset)) + if (snap.exists) + Some(snap) + else + None + } else + None + } + + private def truncateSnapshotFiles(maxOffset: Long) { + listSnapshotFiles().foreach { file => + val snapshotLastOffset = offsetFromFile(file) + if (snapshotLastOffset >= maxOffset) + file.delete() + } + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index defbf34..600b84d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,7 +18,7 @@ package kafka.server import java.nio.ByteBuffer -import java.lang.{Long => JLong, Short => JShort} +import java.lang.{Long => JLong} import java.util.{Collections, Properties} import java.util @@ -28,7 +28,7 @@ import kafka.cluster.Partition import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.common._ import kafka.controller.KafkaController -import kafka.coordinator.{GroupCoordinator, JoinGroupResult} +import kafka.coordinator.{GroupCoordinator, InitPidResult, JoinGroupResult, TransactionCoordinator} import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.{Response, Session} @@ -49,6 +49,7 @@ import org.apache.kafka.common.requests.SaslHandshakeResponse import scala.collection._ import scala.collection.JavaConverters._ +import scala.util.Random /** * Logic to handle the various Kafka requests @@ -56,7 +57,8 @@ import scala.collection.JavaConverters._ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val adminManager: AdminManager, - val coordinator: GroupCoordinator, + val groupCoordinator: GroupCoordinator, + val txnCoordinator: TransactionCoordinator, val controller: KafkaController, val zkUtils: ZkUtils, val brokerId: Int, @@ -100,6 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) + case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -138,11 +141,11 @@ class KafkaApis(val requestChannel: RequestChannel, // leadership changes updatedLeaders.foreach { partition => if (partition.topic == Topic.GroupMetadataTopicName) - coordinator.handleGroupImmigration(partition.partitionId) + groupCoordinator.handleGroupImmigration(partition.partitionId) } updatedFollowers.foreach { partition => if (partition.topic == Topic.GroupMetadataTopicName) - coordinator.handleGroupEmigration(partition.partitionId) + groupCoordinator.handleGroupEmigration(partition.partitionId) } } @@ -181,7 +184,7 @@ class KafkaApis(val requestChannel: RequestChannel, // is not cleared. result.foreach { case (topicPartition, error) => if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) { - coordinator.handleGroupEmigration(topicPartition.partition) + groupCoordinator.handleGroupEmigration(topicPartition.partition) } } new StopReplicaResponse(error, result.asJava) @@ -202,7 +205,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest) if (deletedPartitions.nonEmpty) - coordinator.handleDeletedPartitions(deletedPartitions) + groupCoordinator.handleDeletedPartitions(deletedPartitions) if (adminManager.hasDelayedTopicOperations) { updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic => @@ -305,7 +308,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetRetention = if (header.apiVersion <= 1 || offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME) - coordinator.offsetConfig.offsetsRetentionMs + groupCoordinator.offsetConfig.offsetsRetentionMs else offsetCommitRequest.retentionTime @@ -332,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // call coordinator to handle commit offset - coordinator.handleCommitOffsets( + groupCoordinator.handleCommitOffsets( offsetCommitRequest.groupId, offsetCommitRequest.memberId, offsetCommitRequest.generationId, @@ -792,7 +795,7 @@ class KafkaApis(val requestChannel: RequestChannel, java.util.Collections.emptyList()) } else { createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions, - config.offsetsTopicReplicationFactor.toInt, coordinator.offsetsTopicConfigs) + config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs) } } @@ -946,7 +949,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // versions 1 and above read offsets from Kafka if (offsetFetchRequest.isAllPartitions) { - val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId) + val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId) if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(error) else { @@ -957,7 +960,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala .partition(authorizeTopicDescribe) - val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, + val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId, Some(authorizedPartitions)) if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(error) @@ -980,7 +983,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode) requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } else { - val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) + val partition = groupCoordinator.partitionFor(groupCoordinatorRequest.groupId) // get metadata (and create the topic if necessary) val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName) @@ -1013,7 +1016,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Describe, new Resource(Group, groupId))) { groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED) } else { - val (error, summary) = coordinator.handleDescribeGroup(groupId) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) val members = summary.members.map { member => val metadata = ByteBuffer.wrap(member.metadata) val assignment = ByteBuffer.wrap(member.assignment) @@ -1032,7 +1035,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) { ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED) } else { - val (error, groups) = coordinator.handleListGroups() + val (error, groups) = groupCoordinator.handleListGroups() val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } new ListGroupsResponse(error, allGroups.asJava) } @@ -1066,7 +1069,7 @@ class KafkaApis(val requestChannel: RequestChannel, // let the coordinator to handle join-group val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol => (protocol.name, Utils.toArray(protocol.metadata))).toList - coordinator.handleJoinGroup( + groupCoordinator.handleJoinGroup( joinGroupRequest.groupId, joinGroupRequest.memberId, request.header.clientId, @@ -1090,7 +1093,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED) } else { - coordinator.handleSyncGroup( + groupCoordinator.handleSyncGroup( syncGroupRequest.groupId(), syncGroupRequest.generationId(), syncGroupRequest.memberId(), @@ -1117,7 +1120,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // let the coordinator to handle heartbeat - coordinator.handleHeartbeat( + groupCoordinator.handleHeartbeat( heartbeatRequest.groupId(), heartbeatRequest.memberId(), heartbeatRequest.groupGenerationId(), @@ -1141,7 +1144,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, leaveGroupResponse)) } else { // let the coordinator to handle leave-group - coordinator.handleLeaveGroup( + groupCoordinator.handleLeaveGroup( leaveGroupRequest.groupId(), leaveGroupRequest.memberId(), sendResponseCallback) @@ -1308,6 +1311,17 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleInitPidRequest(request: RequestChannel.Request): Unit = { + val initPidRequest = request.body[InitPidRequest] + // Send response callback + def sendResponseCallback(result: InitPidResult): Unit = { + val responseBody: InitPidResponse = new InitPidResponse(result.error, result.pid, result.epoch) + trace(s"InitPidRequest : Generated new PID ${result.pid} from InitPidRequest from client ${request.header.clientId}") + requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) + } + txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback) + } + def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fe6631e..0f2205f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -33,8 +33,8 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.record.TimestampType -import scala.collection.Map import scala.collection.JavaConverters._ +import scala.collection.Map object Defaults { /** ********* Zookeeper Configuration ***********/ @@ -163,10 +163,14 @@ object Defaults { val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds + /** ********* Transaction Configuration ***********/ + val TransactionalIdExpirationMsDefault = 604800000 + val DeleteTopicEnable = false val CompressionType = "producer" + val MaxIdMapSnapshots = 2 /** ********* Kafka Metrics Configuration ***********/ val MetricNumSamples = 2 val MetricSampleWindowMs = 30000 @@ -194,7 +198,6 @@ object Defaults { val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES - } object KafkaConfig { @@ -280,6 +283,7 @@ object KafkaConfig { val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version" val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type" val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + "message.timestamp.difference.max.ms" + val LogMaxIdMapSnapshotsProp = LogConfigPrefix + "max.id.map.snapshots" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" @@ -332,6 +336,8 @@ object KafkaConfig { val NumReplicationQuotaSamplesProp = "replication.quota.window.num" val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds" + /** ********* Transaction Configuration **********/ + val TransactionalIdExpirationMsProp = "transactional.id.expiration.ms" val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" @@ -568,6 +574,11 @@ object KafkaConfig { val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas" val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas" val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas" + /** ********* Transaction Configuration ***********/ + val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " + + "transaction coordinator. Note that this also influences PID expiration: PIDs are guaranteed to expire " + + "after expiration of this timeout from the last write by the PID (they may expire sooner if the last write " + + "from the PID is deleted due to the topic's retention settings)." val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + @@ -763,6 +774,9 @@ object KafkaConfig { .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc) + /** ********* Transaction configuration ***********/ + .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMsDefault, atLeast(1), LOW, TransactionIdExpirationMsDoc) + /** ********* SSL Configuration ****************/ .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc) .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc) @@ -989,6 +1003,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp) val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp) + /** ********* Transaction Configuration **************/ + val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp) + val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) val compressionType = getString(KafkaConfig.CompressionTypeProp) val listeners: Seq[EndPoint] = getListeners http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0d3e49c..e63a6d2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -29,7 +29,7 @@ import kafka.api.KAFKA_0_9_0 import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} import kafka.controller.{ControllerStats, KafkaController} -import kafka.coordinator.GroupCoordinator +import kafka.coordinator.{GroupCoordinator, TransactionCoordinator} import kafka.log.{CleanerConfig, LogConfig, LogManager} import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter} import kafka.network.{BlockingChannel, SocketServer} @@ -37,7 +37,7 @@ import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ @@ -122,6 +122,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var groupCoordinator: GroupCoordinator = null + var transactionCoordinator: TransactionCoordinator = null + var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) @@ -205,7 +207,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) /* start log manager */ - logManager = createLogManager(zkUtils.zkClient, brokerState) + logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time) logManager.startup() metadataCache = new MetadataCache(config.brokerId) @@ -229,6 +231,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM) groupCoordinator.startup() + /* start transaction coordinator */ + // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue + transactionCoordinator = TransactionCoordinator(config, zkUtils, Time.SYSTEM) + transactionCoordinator.startup() + /* Get the authorizer and initialize it if one is specified.*/ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) @@ -237,9 +244,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, - kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, - clusterId, time) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads) @@ -403,8 +409,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP while (!shutdownSucceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 - import NetworkClientBlockingOps._ - // 1. Find the controller and establish a connection to it. // Get the current controller info. This is to ensure we use the most recent info to issue the @@ -431,14 +435,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (prevController != null) { try { - if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time)) + if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") // send the controlled shutdown request val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId) val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest, time.milliseconds(), true) - val clientResponse = networkClient.blockingSendAndReceive(request)(time) + val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time) val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse] if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) { @@ -633,36 +637,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) - private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { - val defaultProps = KafkaServer.copyKafkaConfigToLog(config) - val defaultLogConfig = LogConfig(defaultProps) - - val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) => - topic -> LogConfig.fromProps(defaultProps, configs) - } - // read the log configurations from zookeeper - val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, - dedupeBufferSize = config.logCleanerDedupeBufferSize, - dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, - ioBufferSize = config.logCleanerIoBufferSize, - maxMessageSize = config.messageMaxBytes, - maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, - backOffMs = config.logCleanerBackoffMs, - enableCleaner = config.logCleanerEnable) - new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, - topicConfigs = configs, - defaultConfig = defaultLogConfig, - cleanerConfig = cleanerConfig, - ioThreads = config.numRecoveryThreadsPerDataDir, - flushCheckMs = config.logFlushSchedulerIntervalMs, - flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, - flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, - retentionCheckMs = config.logCleanupIntervalMs, - scheduler = kafkaScheduler, - brokerState = brokerState, - time = time) - } - /** * Generates new brokerId if enabled or reads from meta.properties based on following conditions * <ol> http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5f055a6..cce59ce 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -27,7 +27,7 @@ import kafka.api.{FetchRequest => _, _} import kafka.common.KafkaStorageException import ReplicaFetcherThread._ import kafka.utils.Exit -import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.clients._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse} @@ -248,14 +248,13 @@ class ReplicaFetcherThread(name: String, } private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = { - import kafka.utils.NetworkClientBlockingOps._ try { - if (!networkClient.blockingReady(sourceNode, socketTimeout)(time)) + if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") else { val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder, time.milliseconds(), true) - networkClient.blockingSendAndReceive(clientRequest)(time) + NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) } } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala deleted file mode 100644 index 0370564..0000000 --- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.io.IOException - -import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient} -import org.apache.kafka.common.Node -import org.apache.kafka.common.requests.AbstractRequest -import org.apache.kafka.common.utils.Time - -import scala.annotation.tailrec -import scala.collection.JavaConverters._ - -object NetworkClientBlockingOps { - implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps = - new NetworkClientBlockingOps(client) -} - -/** - * Provides extension methods for `NetworkClient` that are useful for implementing blocking behaviour. Use with care. - * - * Example usage: - * - * {{{ - * val networkClient: NetworkClient = ... - * import NetworkClientBlockingOps._ - * networkClient.blockingReady(...) - * }}} - */ -class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { - - /** - * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending - * disconnects have been processed. - * - * This method can be used to check the status of a connection prior to calling `blockingReady` to be able - * to tell whether the latter completed a new connection. - */ - def isReady(node: Node)(implicit time: Time): Boolean = { - val currentTime = time.milliseconds() - client.poll(0, currentTime) - client.isReady(node, currentTime) - } - - /** - * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll` - * invocations until the connection to `node` is ready, the timeout expires or the connection fails. - * - * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails, - * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive - * connection timeout, it is possible for this method to raise an `IOException` for a previous connection which - * has recently disconnected. - * - * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with - * care. - */ - def blockingReady(node: Node, timeout: Long)(implicit time: Time): Boolean = { - require(timeout >=0, "timeout should be >= 0") - - val startTime = time.milliseconds() - val expiryTime = startTime + timeout - - @tailrec - def awaitReady(iterationStartTime: Long): Boolean = { - if (client.isReady(node, iterationStartTime)) - true - else if (client.connectionFailed(node)) - throw new IOException(s"Connection to $node failed") - else { - val pollTimeout = expiryTime - iterationStartTime - client.poll(pollTimeout, iterationStartTime) - val afterPollTime = time.milliseconds() - if (afterPollTime < expiryTime) awaitReady(afterPollTime) - else false - } - } - - isReady(node) || client.ready(node, startTime) || awaitReady(startTime) - } - - /** - * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a - * disconnection happens (which can happen for a number of reasons including a request timeout). - * - * In case of a disconnection, an `IOException` is thrown. - * - * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with - * care. - */ - def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): ClientResponse = { - client.send(request, time.milliseconds()) - pollContinuously { responses => - val response = responses.find { response => - response.requestHeader.correlationId == request.correlationId - } - response.foreach { r => - if (r.wasDisconnected) - throw new IOException(s"Connection to ${request.destination} was disconnected before the response was read") - else if (r.versionMismatch() != null) - throw r.versionMismatch(); - } - response - } - } - - /** - * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned. - * - * Exceptions thrown via `collect` are not handled and will bubble up. - * - * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with - * care. - */ - private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: Time): T = { - - @tailrec - def recursivePoll: T = { - // rely on request timeout to ensure we don't block forever - val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala - collect(responses) match { - case Some(result) => result - case None => recursivePoll - } - } - - recursivePoll - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index aa55479..6ff5c5f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -31,9 +31,6 @@ import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMars import org.I0Itec.zkclient.serialize.ZkSerializer import org.I0Itec.zkclient.{ZkClient, ZkConnection} import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.{ACL, Stat} @@ -65,6 +62,7 @@ object ZkUtils { val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election" val BrokerSequenceIdPath = s"$BrokersPath/seqid" val ConfigChangesPath = s"$ConfigPath/changes" + val PidBlockPath = "/latest_pid_block" // Important: it is necessary to add any new top level Zookeeper path to the Seq @@ -76,7 +74,8 @@ object ZkUtils { ControllerEpochPath, IsrChangeNotificationPath, KafkaAclPath, - KafkaAclChangesPath) + KafkaAclChangesPath, + PidBlockPath) def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = { val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) @@ -217,7 +216,8 @@ class ZkUtils(val zkClient: ZkClient, getEntityConfigRootPath(ConfigType.Client), DeleteTopicsPath, BrokerSequenceIdPath, - IsrChangeNotificationPath) + IsrChangeNotificationPath, + PidBlockPath) val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure) @@ -529,12 +529,12 @@ class ZkUtils(val zkClient: ZkClient, case Some(checker) => checker(this, path, data) case _ => debug("Checker method is not passed skipping zkData match") - warn("Conditional update of path %s with data %s and expected version %d failed due to %s" + debug("Conditional update of path %s with data %s and expected version %d failed due to %s" .format(path, data,expectVersion, e1.getMessage)) (false, -1) } case e2: Exception => - warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, expectVersion, e2.getMessage)) (false, -1) } @@ -624,6 +624,20 @@ class ZkUtils(val zkClient: ZkClient, dataAndStat } + def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = { + val stat = new Stat() + try { + val data: String = zkClient.readData(path, stat) + if (data == null.asInstanceOf[String]) + (None, stat.getVersion) + else + (Some(data), stat.getVersion) + } catch { + case _: ZkNoNodeException => + (None, stat.getVersion) + } + } + def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala def getChildrenParentMayNotExist(path: String): Seq[String] = { @@ -719,6 +733,14 @@ class ZkUtils(val zkClient: ZkClient, } } + def getTopicPartitionCount(topic: String): Option[Int] = { + val topicData = getPartitionAssignmentForTopics(Seq(topic)) + if (topicData(topic).nonEmpty) + Some(topicData(topic).size) + else + None + } + def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = { // read the partitions and their new replica list val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1 http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 852377c..5aeeefe 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -14,6 +14,7 @@ package kafka.api import java.util.Properties +import java.util.concurrent.Future import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness @@ -24,11 +25,13 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.junit.Assert._ import org.junit.{After, Before, Test} +import scala.collection.mutable.ArrayBuffer + class ProducerBounceTest extends KafkaServerTestHarness { - private val producerBufferSize = 30000 + private val producerBufferSize = 65536 private val serverMessageMaxBytes = producerBufferSize/2 - val numServers = 2 + val numServers = 4 val overridingProps = new Properties() overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) @@ -36,7 +39,9 @@ class ProducerBounceTest extends KafkaServerTestHarness { // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) - + overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString) + overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString) + overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation @@ -47,31 +52,19 @@ class ProducerBounceTest extends KafkaServerTestHarness { // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. override def generateConfigs() = { - FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = false) + FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true) .map(KafkaConfig.fromProps(_, overridingProps)) } - private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null - private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null - private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null - private val topic1 = "topic-1" @Before override def setUp() { super.setUp() - - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize) - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize) } @After override def tearDown() { - if (producer1 != null) producer1.close - if (producer2 != null) producer2.close - if (producer3 != null) producer3.close - super.tearDown() } @@ -81,19 +74,25 @@ class ProducerBounceTest extends KafkaServerTestHarness { @Test def testBrokerFailure() { val numPartitions = 3 - val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers) + val topicConfig = new Properties(); + topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) + val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig) + assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) val scheduler = new ProducerScheduler() scheduler.start // rolling bounce brokers + for (_ <- 0 until numServers) { for (server <- servers) { + info("Shutting down server : %s".format(server.config.brokerId)) server.shutdown() server.awaitShutdown() + info("Server %s shut down. Starting it up again.".format(server.config.brokerId)) server.startup() - Thread.sleep(2000) + info("Restarted server: %s".format(server.config.brokerId)) } // Make sure the producer do not see any exception in returned metadata due to broker failures @@ -121,8 +120,9 @@ class ProducerBounceTest extends KafkaServerTestHarness { val messages = fetchResponses.flatMap(r => r.iterator.toList.map(_.message)) val uniqueMessages = messages.toSet val uniqueMessageSize = uniqueMessages.size - - assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize) + info(s"number of unique messages sent: ${uniqueMessageSize}") + assertEquals(s"Found ${messages.size - uniqueMessageSize} duplicate messages.", uniqueMessageSize, messages.size) + assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, messages.size) } private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) { @@ -130,26 +130,51 @@ class ProducerBounceTest extends KafkaServerTestHarness { var sent = 0 var failed = false - val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10) + val producerConfig = new Properties() + producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + val producerConfigWithCompression = new Properties() + producerConfigWithCompression.putAll(producerConfig) + producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4") + val producers = List( + TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)), + TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 2, retries = 10, lingerMs = 5000, props = Some(producerConfig)), + TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10, lingerMs = 10000, props = Some(producerConfigWithCompression)) + ) override def doWork(): Unit = { - val responses = - for (i <- sent+1 to sent+numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes), - new ErrorLoggingCallback(topic1, null, null, true)) - val futures = responses.toList + info("Starting to send messages..") + var producerId = 0 + val responses = new ArrayBuffer[IndexedSeq[Future[RecordMetadata]]]() + for (producer <- producers) { + val response = + for (i <- sent+1 to sent+numRecords) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, ((producerId + 1) * i).toString.getBytes), + new ErrorLoggingCallback(topic1, null, null, true)) + responses.append(response) + producerId += 1 + } try { - futures.map(_.get) - sent += numRecords + for (response <- responses) { + val futures = response.toList + futures.map(_.get) + sent += numRecords + } + info(s"Sent $sent records") } catch { - case _ : Exception => failed = true + case e : Exception => + error(s"Got exception ${e.getMessage}") + e.printStackTrace() + failed = true } } override def shutdown(){ super.shutdown() - producer.close + for (producer <- producers) { + producer.close() + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 8a198eb..61199c2 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -79,14 +79,11 @@ class GroupCoordinatorResponseTest extends JUnitSuite { props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) - // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - val ret = mutable.Map[String, Map[Int, Seq[Int]]]() - ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1))) - replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret) + // make two partitions of the group topic to make sure some partitions are not owned by the coordinator + EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2)) EasyMock.replay(zkUtils) timer = new MockTimer http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 6b1abf3..9d38485 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -69,11 +69,8 @@ class GroupMetadataManagerTest { offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - val ret = mutable.Map[String, Map[Int, Seq[Int]]]() - ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1))) - zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(Topic.GroupMetadataTopicName))).andReturn(ret) + EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2)) EasyMock.replay(zkUtils) time = new MockTime http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala new file mode 100644 index 0000000..da9ec47 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.common.KafkaException +import kafka.utils.ZkUtils +import org.easymock.{Capture, EasyMock, IAnswer} +import org.junit.{After, Test} +import org.junit.Assert._ + +class ProducerIdManagerTest { + + val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + + @After + def tearDown(): Unit = { + EasyMock.reset(zkUtils) + } + + @Test + def testGetPID() { + var zkVersion: Int = -1 + var data: String = null + EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString())) + .andAnswer(new IAnswer[(Option[String], Int)] { + override def answer(): (Option[String], Int) = { + if (zkVersion == -1) { + (None.asInstanceOf[Option[String]], 0) + } else { + (Some(data), zkVersion) + } + } + }) + .anyTimes() + + val capturedVersion: Capture[Int] = EasyMock.newCapture() + val capturedData: Capture[String] = EasyMock.newCapture() + EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(), + EasyMock.capture(capturedData), + EasyMock.capture(capturedVersion), + EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]])) + .andAnswer(new IAnswer[(Boolean, Int)] { + override def answer(): (Boolean, Int) = { + zkVersion = capturedVersion.getValue + 1 + data = capturedData.getValue + + (true, zkVersion) + } + }) + .anyTimes() + + EasyMock.replay(zkUtils) + + val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils) + val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils) + + val pid1 = manager1.nextPid() + val pid2 = manager2.nextPid() + + assertEquals(0, pid1) + assertEquals(ProducerIdManager.PidBlockSize, pid2) + + for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) { + assertEquals(pid1 + i, manager1.nextPid()) + } + + for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) { + assertEquals(pid2 + i, manager2.nextPid()) + } + + assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid()) + assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid()) + } + + @Test(expected = classOf[KafkaException]) + def testExceedPIDLimit() { + EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString())) + .andAnswer(new IAnswer[(Option[String], Int)] { + override def answer(): (Option[String], Int) = { + (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0, + Long.MaxValue - ProducerIdManager.PidBlockSize, + Long.MaxValue))), 0) + } + }) + .anyTimes() + EasyMock.replay(zkUtils) + new ProducerIdManager(0, zkUtils) + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala new file mode 100644 index 0000000..f8ef5dc --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.utils.ZkUtils +import org.apache.kafka.common.protocol.Errors +import org.easymock.{Capture, EasyMock, IAnswer} +import org.junit.{After, Before, Test} +import org.junit.Assert._ + +class TransactionCoordinatorTest { + + val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + + var zkVersion: Int = -1 + var data: String = null + val capturedVersion: Capture[Int] = EasyMock.newCapture() + val capturedData: Capture[String] = EasyMock.newCapture() + EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString())) + .andAnswer(new IAnswer[(Option[String], Int)] { + override def answer(): (Option[String], Int) = { + if (zkVersion == -1) { + (None.asInstanceOf[Option[String]], 0) + } else { + (Some(data), zkVersion) + } + } + }) + .anyTimes() + + EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(), + EasyMock.capture(capturedData), + EasyMock.capture(capturedVersion), + EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]])) + .andAnswer(new IAnswer[(Boolean, Int)] { + override def answer(): (Boolean, Int) = { + zkVersion = capturedVersion.getValue + 1 + data = capturedData.getValue + + (true, zkVersion) + } + }) + .anyTimes() + + EasyMock.replay(zkUtils) + + val pidManager: ProducerIdManager = new ProducerIdManager(0, zkUtils) + val coordinator: TransactionCoordinator = new TransactionCoordinator(0, pidManager) + + var result: InitPidResult = null + + @Before + def setUp(): Unit = { + coordinator.startup() + } + + @After + def tearDown(): Unit = { + EasyMock.reset(zkUtils) + coordinator.shutdown() + } + + @Test + def testHandleInitPid() = { + coordinator.handleInitPid("", initPidMockCallback) + assertEquals(InitPidResult(0L, 0, Errors.NONE), result) + + coordinator.handleInitPid("", initPidMockCallback) + assertEquals(InitPidResult(1L, 0, Errors.NONE), result) + + coordinator.handleInitPid(null, initPidMockCallback) + assertEquals(InitPidResult(2L, 0, Errors.NONE), result) + } + + def initPidMockCallback(ret: InitPidResult): Unit = { + result = ret + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 5f97708..49faa85 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -53,7 +53,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin val logProps = new Properties() logProps.put(LogConfig.CompressionTypeProp, brokerCompression) /*configure broker-side compression */ - val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time) /* append two messages */ log.append(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 3e91f96..2104842 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -322,7 +322,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) { val dir = new File(logDir, "log-" + i) dir.mkdirs() - val log = new Log(dir = dir, + val log = new Log(dir, LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)), logStartOffset = 0L, recoveryPoint = 0L, http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala index 05d9060..2cfcc07 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala @@ -149,7 +149,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float) - val log = new Log(dir = dir, + val log = new Log(dir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 94207ec..e933c87 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -185,8 +185,8 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { config, logStartOffset = 0L, recoveryPoint = 0L, - time.scheduler, - time) + scheduler = time.scheduler, + time = time) log } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 38eb94c..928b03d 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -174,6 +174,27 @@ class LogCleanerTest extends JUnitSuite { } @Test + def testLogCleanerRetainsLastWrittenRecordForEachPid(): Unit = { + val cleaner = makeCleaner(10) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + log.append(record(0, 0)) // offset 0 + log.append(record(0, 1, pid = 1, epoch = 0, sequence = 0)) // offset 1 + log.append(record(0, 2, pid = 2, epoch = 0, sequence = 0)) // offset 2 + log.append(record(0, 3, pid = 3, epoch = 0, sequence = 0)) // offset 3 + log.append(record(1, 1, pid = 2, epoch = 0, sequence = 1)) // offset 4 + + // roll the segment, so we can clean the messages already appended + log.roll() + + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset)) + assertEquals(immutable.List(0, 0, 1), keysInLog(log)) + assertEquals(immutable.List(1, 3, 4), offsetsInLog(log)) + } + + @Test def testPartialSegmentClean(): Unit = { // because loadFactor is 0.75, this means we can fit 2 messages in the map val cleaner = makeCleaner(2) @@ -796,8 +817,12 @@ class LogCleanerTest extends JUnitSuite { def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes) - def record(key: Int, value: Int): MemoryRecords = - record(key, value.toString.getBytes) + + def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH, + sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = { + MemoryRecords.withRecords(0L, CompressionType.NONE, pid, epoch, sequence, + new SimpleRecord(key.toString.getBytes, value.toString.getBytes)) + } def record(key: Int, value: Array[Byte]) = TestUtils.singletonRecords(key = key.toString.getBytes, value = value) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a8e953a..1400615 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -102,7 +102,7 @@ class LogManagerTest { time.sleep(maxLogAgeMs + 1) assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) - assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) + assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).records.sizeInBytes) try { @@ -148,7 +148,7 @@ class LogManagerTest { time.sleep(logManager.InitialTaskDelayMs) assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) - assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length) + assertEquals("Files should have been deleted", log.numberOfSegments * 3 + 1, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).records.sizeInBytes) try { log.read(0, 1024)
