Repository: kafka Updated Branches: refs/heads/trunk ce1cb329d -> 6d6c77a7a
http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index eddb47a..8854c3a 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -312,7 +312,7 @@ class LogSegment(val log: FileRecords, if (ms == null) { baseOffset } else { - ms.records.shallowEntries.asScala.toSeq.lastOption match { + ms.records.shallowEntries.asScala.lastOption match { case None => baseOffset case Some(last) => last.nextOffset } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index d99c2ad..224a792 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -64,76 +64,8 @@ private[kafka] object LogValidator { assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, messageTimestampDiffMaxMs) } else { - // Deal with compressed messages - // We cannot do in place assignment in one of the following situations: - // 1. Source and target compression codec are different - // 2. When magic value to use is 0 because offsets need to be overwritten - // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. - // 4. Message format conversion is needed. - - // No in place assignment situation 1 and 2 - var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0 - - var maxTimestamp = Record.NO_TIMESTAMP - val expectedInnerOffset = new LongRef(0) - val validatedRecords = new mutable.ArrayBuffer[Record] - - records.deepEntries(true).asScala.foreach { logEntry => - val record = logEntry.record - validateKey(record, compactedTopic) - - if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) { - // No in place assignment situation 3 - // Validate the timestamp - validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs) - // Check if we need to overwrite offset - if (logEntry.offset != expectedInnerOffset.getAndIncrement()) - inPlaceAssignment = false - if (record.timestamp > maxTimestamp) - maxTimestamp = record.timestamp - } - - if (sourceCodec != NoCompressionCodec && logEntry.isCompressed) - throw new InvalidMessageException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record") - - // No in place assignment situation 4 - if (record.magic != messageFormatVersion) - inPlaceAssignment = false - - validatedRecords += record.convert(messageFormatVersion) - } - - if (!inPlaceAssignment) { - val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record)) - val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec), - now, entries.asJava) - builder.close() - val info = builder.info - - ValidationAndOffsetAssignResult( - validatedRecords = builder.build(), - maxTimestamp = info.maxTimestamp, - shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, - messageSizeMaybeChanged = true) - } else { - // ensure the inner messages are valid - validatedRecords.foreach(_.ensureValid) - - // we can update the wrapper message only and write the compressed payload as is - val entry = records.shallowEntries.iterator.next() - val offset = offsetCounter.addAndGet(validatedRecords.size) - 1 - entry.setOffset(offset) - if (messageTimestampType == TimestampType.CREATE_TIME) - entry.setCreateTime(maxTimestamp) - else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) - entry.setLogAppendTime(now) - - ValidationAndOffsetAssignResult(validatedRecords = records, - maxTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp, - shallowOffsetOfMaxTimestamp = offset, - messageSizeMaybeChanged = false) - } + validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, + messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs) } } @@ -159,11 +91,10 @@ private[kafka] object LogValidator { builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record) } - builder.close() + val convertedRecords = builder.build() val info = builder.info - ValidationAndOffsetAssignResult( - validatedRecords = builder.build(), + validatedRecords = convertedRecords, maxTimestamp = info.maxTimestamp, shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, messageSizeMaybeChanged = true) @@ -210,6 +141,87 @@ private[kafka] object LogValidator { messageSizeMaybeChanged = false) } + /** + * We cannot do in place assignment in one of the following situations: + * 1. Source and target compression codec are different + * 2. When magic value to use is 0 because offsets need to be overwritten + * 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. + * 4. Message format conversion is needed. + */ + def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, + offsetCounter: LongRef, + now: Long, + sourceCodec: CompressionCodec, + targetCodec: CompressionCodec, + compactedTopic: Boolean = false, + messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, + messageTimestampType: TimestampType, + messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { + // No in place assignment situation 1 and 2 + var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0 + + var maxTimestamp = Record.NO_TIMESTAMP + val expectedInnerOffset = new LongRef(0) + val validatedRecords = new mutable.ArrayBuffer[Record] + + records.deepEntries(true).asScala.foreach { logEntry => + val record = logEntry.record + validateKey(record, compactedTopic) + + if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) { + // Validate the timestamp + validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs) + // Check if we need to overwrite offset, no in place assignment situation 3 + if (logEntry.offset != expectedInnerOffset.getAndIncrement()) + inPlaceAssignment = false + if (record.timestamp > maxTimestamp) + maxTimestamp = record.timestamp + } + + if (sourceCodec != NoCompressionCodec && logEntry.isCompressed) + throw new InvalidMessageException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record") + + // No in place assignment situation 4 + if (record.magic != messageFormatVersion) + inPlaceAssignment = false + + validatedRecords += record.convert(messageFormatVersion) + } + + if (!inPlaceAssignment) { + val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record)) + val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec), + now, entries.asJava) + val updatedRecords = builder.build() + val info = builder.info + ValidationAndOffsetAssignResult( + validatedRecords = updatedRecords, + maxTimestamp = info.maxTimestamp, + shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, + messageSizeMaybeChanged = true) + } else { + // ensure the inner messages are valid + validatedRecords.foreach(_.ensureValid) + + // we can update the wrapper message only and write the compressed payload as is + val entry = records.shallowEntries.iterator.next() + val offset = offsetCounter.addAndGet(validatedRecords.size) - 1 + entry.setOffset(offset) + + val shallowTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp + if (messageTimestampType == TimestampType.LOG_APPEND_TIME) + entry.setLogAppendTime(shallowTimestamp) + else if (messageTimestampType == TimestampType.CREATE_TIME) + entry.setCreateTime(shallowTimestamp) + + ValidationAndOffsetAssignResult(validatedRecords = records, + maxTimestamp = shallowTimestamp, + shallowOffsetOfMaxTimestamp = offset, + messageSizeMaybeChanged = false) + } + } + private def validateKey(record: Record, compactedTopic: Boolean) { if (compactedTopic && !record.hasKey) throw new InvalidMessageException("Compacted topic cannot accept message without key.") http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/message/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 175b7e9..e0efb3d 100755 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -99,8 +99,8 @@ object Message { def fromRecord(record: Record): Message = { - val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp() == null) None else Some(record.wrapperRecordTimestamp()) - val wrapperTimestampType = Option(record.wrapperRecordTimestampType()) + val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp == null) None else Some(record.wrapperRecordTimestamp) + val wrapperTimestampType = Option(record.wrapperRecordTimestampType) new Message(record.buffer, wrapperTimestamp, wrapperTimestampType) } } @@ -139,13 +139,9 @@ class Message(val buffer: ByteBuffer, import kafka.message.Message._ - private[message] def asRecord: Record = { - wrapperMessageTimestamp match { - case None => new Record(buffer) - case Some(timestamp) => - val timestampType = wrapperMessageTimestampType.orNull - new Record(buffer, timestamp, timestampType) - } + private[message] def asRecord: Record = wrapperMessageTimestamp match { + case None => new Record(buffer) + case Some(timestamp) => new Record(buffer, timestamp, wrapperMessageTimestampType.orNull) } /** @@ -227,7 +223,7 @@ class Message(val buffer: ByteBuffer, * Compute the checksum of the message from the message contents */ def computeChecksum: Long = - CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset) + Utils.computeChecksum(buffer, MagicOffset, buffer.limit - MagicOffset) /** * Retrieve the previously computed CRC for this message http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1dbd373..ec25700 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -145,7 +145,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.NONE => try { val records = partitionData.toRecords - val newOffset = records.shallowEntries.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse( + val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse( currentPartitionFetchState.offset) fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/DelayedOperationKey.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index 0e05cce..1933339 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -36,8 +36,6 @@ case class TopicPartitionOperationKey(topic: String, partition: Int) extends Del def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition) - def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) - override def keyLabel = "%s-%d".format(topic, partition) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index bbddfae..d78021f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -427,7 +427,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val controlledShutdownRequest = new ControlledShutdownRequest(config.brokerId) val request = new ClientRequest(node(prevController).idString, time.milliseconds(), true, requestHeader, controlledShutdownRequest, null) - val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)(time) + val clientResponse = networkClient.blockingSendAndReceive(request)(time) val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse] if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/LogOffsetMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index 7067b20..05e9842 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -25,7 +25,7 @@ object LogOffsetMetadata { val UnknownFilePosition = -1 class OffsetOrdering extends Ordering[LogOffsetMetadata] { - override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { + override def compare(x: LogOffsetMetadata, y: LogOffsetMetadata): Int = { x.offsetDiff(y).toInt } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d5d7a13..3811be3 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -246,7 +246,7 @@ class ReplicaFetcherThread(name: String, throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") else { val clientRequest = new ClientRequest(sourceBroker.id.toString, time.milliseconds(), true, header, request, null) - networkClient.blockingSendAndReceive(clientRequest, request)(time) + networkClient.blockingSendAndReceive(clientRequest)(time) } } catch { @@ -260,7 +260,7 @@ class ReplicaFetcherThread(name: String, private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = { val (request, apiVersion) = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) { - val partitions = Map(topicPartition -> java.lang.Long.valueOf(earliestOrLatest)) + val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long)) (new ListOffsetRequest(partitions.asJava, consumerId), 1) } else { val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 859a7c4..87b8d90 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -26,7 +26,6 @@ import kafka.cluster.{Partition, Replica} import kafka.common._ import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogManager} -import kafka.message.InvalidMessageException import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.utils._ @@ -217,7 +216,7 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread - // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x (1 + 50%) before it is removed from ISR + // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) } @@ -418,8 +417,6 @@ class ReplicaManager(val config: KafkaConfig, _: RecordTooLargeException | _: RecordBatchTooLargeException | _: CorruptRecordException | - _: InvalidRecordException | - _: InvalidMessageException | _: InvalidTimestampException) => (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => @@ -667,7 +664,7 @@ class ReplicaManager(val config: KafkaConfig, val partitionState = new mutable.HashMap[Partition, PartitionState]() leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) - val partitionLeaderEpoch = partition.getLeaderEpoch() + val partitionLeaderEpoch = partition.getLeaderEpoch // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < stateInfo.leaderEpoch) { @@ -831,16 +828,16 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower += partition else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") + "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, - partition.topic, partition.partitionId, newLeaderBrokerId)) + partition.topicPartition, newLeaderBrokerId)) case None => // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + - " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.") + " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, - partition.topic, partition.partitionId, newLeaderBrokerId)) + partition.topicPartition, newLeaderBrokerId)) // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition's high watermark in the checkpoint file (see KAFKA-1647) partition.getOrCreateReplica() @@ -858,22 +855,22 @@ class ReplicaManager(val config: KafkaConfig, (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) }.toMap) partitionsToMakeFollower.foreach { partition => - val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) + val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) } partitionsToMakeFollower.foreach { partition => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - partition.topic, partition.partitionId, correlationId, controllerId, epoch)) + partition.topicPartition, correlationId, controllerId, epoch)) } if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, partition.topic, partition.partitionId)) + "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, partition.topicPartition)) } } else { @@ -886,8 +883,8 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + - "%d epoch %d with correlation id %d for partition [%s,%d]") - .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId)) + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) } } } catch { @@ -930,7 +927,7 @@ class ReplicaManager(val config: KafkaConfig, } private def getLeaderPartitions() : List[Partition] = { - allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList + allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList } // Flushes the highwatermark value for all partitions to the highwatermark file @@ -944,7 +941,7 @@ class ReplicaManager(val config: KafkaConfig, } catch { case e: IOException => fatal("Error writing to highwatermark file: ", e) - Runtime.getRuntime().halt(1) + Runtime.getRuntime.halt(1) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 99b5aae..7fe9cc9 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -31,10 +31,8 @@ import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ import scala.collection.mutable import kafka.cluster.EndPoint -import org.apache.kafka.common.utils.Crc32 import org.apache.kafka.common.utils.Utils - /** * General helper functions! * @@ -130,26 +128,6 @@ object CoreUtils extends Logging { } /** - * Compute the CRC32 of the byte array - * @param bytes The array to compute the checksum for - * @return The CRC32 - */ - def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length) - - /** - * Compute the CRC32 of the segment of the byte array given by the specified size and offset - * @param bytes The bytes to checksum - * @param offset the offset at which to begin checksumming - * @param size the number of bytes to checksum - * @return The CRC32 - */ - def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = { - val crc = new Crc32() - crc.update(bytes, offset, size) - crc.getValue() - } - - /** * Read some bytes into the provided buffer, and return the number of bytes read. If the * channel has been closed or we get -1 on the read for any reason, throw an EOFException */ http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala index e3d389b..62e7d94 100644 --- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala +++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala @@ -103,7 +103,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. */ - def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: Time): ClientResponse = { + def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): ClientResponse = { client.send(request, time.milliseconds()) pollContinuously { responses => http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 0cf6474..df74f29 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -33,9 +33,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { m.foreach(kv => pool.put(kv._1, kv._2)) } - def put(k: K, v: V) = pool.put(k, v) + def put(k: K, v: V): V = pool.put(k, v) - def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v) + def putIfNotExists(k: K, v: V): V = pool.putIfAbsent(k, v) /** * Gets the value associated with the given key. If there is no associated @@ -44,27 +44,40 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { * as lazy if its side-effects need to be avoided. * * @param key The key to lookup. - * @return The final value associated with the key. This may be different from - * the value created by the factory if another thread successfully - * put a value. + * @return The final value associated with the key. */ - def getAndMaybePut(key: K) = { + def getAndMaybePut(key: K): V = { if (valueFactory.isEmpty) throw new KafkaException("Empty value factory in pool.") - val curr = pool.get(key) - if (curr == null) { + getAndMaybePut(key, valueFactory.get(key)) + } + + /** + * Gets the value associated with the given key. If there is no associated + * value, then create the value using the provided by `createValue` and return the + * value associated with the key. + * + * @param key The key to lookup. + * @param createValue Factory function. + * @return The final value associated with the key. + */ + def getAndMaybePut(key: K, createValue: => V): V = { + val current = pool.get(key) + if (current == null) { createLock synchronized { - val curr = pool.get(key) - if (curr == null) - pool.put(key, valueFactory.get(key)) - pool.get(key) + val current = pool.get(key) + if (current == null) { + val value = createValue + pool.put(key, value) + value + } + else current } } - else - curr + else current } - def contains(id: K) = pool.containsKey(id) + def contains(id: K): Boolean = pool.containsKey(id) def get(key: K): V = pool.get(key) @@ -78,9 +91,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def clear() { pool.clear() } - override def size = pool.size + override def size: Int = pool.size - override def iterator = new Iterator[(K,V)]() { + override def iterator: Iterator[(K, V)] = new Iterator[(K,V)]() { private val iter = pool.entrySet.iterator http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index bb93cb4..6381447 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -15,7 +15,6 @@ package integration.kafka.api import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.log.Log -import kafka.message.GZIPCompressionCodec import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -27,11 +26,13 @@ import org.junit.Assert._ import scala.collection.JavaConverters._ import java.util.Properties +import org.apache.kafka.common.record.CompressionType + class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { - val offsetsTopicCompressionCodec = GZIPCompressionCodec + val offsetsTopicCompressionCodec = CompressionType.GZIP val overridingProps = new Properties() overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") - overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString) + overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.id.toString) override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { KafkaConfig.fromProps(_, overridingProps) @@ -55,8 +56,8 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { val logSegments = getGroupMetadataLogOpt.get.logSegments val incorrectCompressionCodecs = logSegments - .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType.id)) - .filter(_ != offsetsTopicCompressionCodec.codec) + .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType)) + .filter(_ != offsetsTopicCompressionCodec) assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs) consumer.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 83280dc..ee556d7 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -29,8 +29,6 @@ import kafka.integration.KafkaServerTestHarness import org.junit.{After, Before} import scala.collection.mutable.Buffer -import scala.util.control.Breaks.{breakable, break} -import java.util.ConcurrentModificationException /** * A helper class for writing integration tests that involve producers, consumers, and servers http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index ef2b0af..b7d2fa1 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -18,7 +18,8 @@ package kafka.message import java.nio.ByteBuffer -import java.nio.channels.GatheringByteChannel +import java.nio.channels.{FileChannel, GatheringByteChannel} +import java.nio.file.StandardOpenOption import org.junit.Assert._ import kafka.utils.TestUtils._ @@ -118,14 +119,13 @@ trait BaseMessageSetTestCases extends JUnitSuite { // do the write twice to ensure the message set is restored to its original state for (_ <- 0 to 1) { val file = tempFile() - val fileRecords = FileRecords.open(file, true) + val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, StandardOpenOption.WRITE) try { - val written = write(fileRecords.channel) - fileRecords.resize() // resize since we wrote to the channel directly - + val written = write(channel) assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) + val fileRecords = new FileRecords(file, channel, 0, Integer.MAX_VALUE, false) assertEquals(set.asRecords.deepEntries.asScala.toVector, fileRecords.deepEntries.asScala.toVector) - } finally fileRecords.close() + } finally channel.close() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 0a03cac..348bfc3 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -75,11 +75,11 @@ class IsrExpirationTest { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to the Leader logEndOffset (15) - for(replica <- partition0.assignedReplicas() - leaderReplica) + for (replica <- partition0.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), hw = 15L, leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, + fetchTimeMs = time.milliseconds, readSize = -1)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -127,11 +127,11 @@ class IsrExpirationTest { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms - for(replica <- partition0.assignedReplicas() - leaderReplica) + for (replica <- partition0.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(10L), MemoryRecords.EMPTY), hw = 10L, leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, + fetchTimeMs = time.milliseconds, readSize = -1)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. @@ -141,12 +141,13 @@ class IsrExpirationTest { time.sleep(75) - (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), - hw = 11L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1))) + (partition0.assignedReplicas - leaderReplica).foreach { r => + r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(11L), MemoryRecords.EMPTY), + hw = 11L, + leaderLogEndOffset = 15L, + fetchTimeMs = time.milliseconds, + readSize = -1)) + } partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -157,12 +158,13 @@ class IsrExpirationTest { assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) // Now actually make a fetch to the end of the log. The replicas should be back in ISR - (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), - hw = 15L, - leaderLogEndOffset = 15L, - fetchTimeMs =time.milliseconds, - readSize = -1))) + (partition0.assignedReplicas - leaderReplica).foreach { r => + r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(15L), MemoryRecords.EMPTY), + hw = 15L, + leaderLogEndOffset = 15L, + fetchTimeMs = time.milliseconds, + readSize = -1)) + } partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -180,7 +182,7 @@ class IsrExpirationTest { // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet // set lastCaughtUpTime to current time - for(replica <- partition.assignedReplicas() - leaderReplica) + for (replica <- partition.assignedReplicas - leaderReplica) replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new LogOffsetMetadata(0L), MemoryRecords.EMPTY), hw = 0L, leaderLogEndOffset = 0L, http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1ce17dc..cffb04d 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -32,7 +32,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.{MemoryRecords, Record} import org.easymock.EasyMock import org.junit.Assert._ -import scala.collection.JavaConverters._ class SimpleFetchTest { http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 2d9b9a5..ac83f1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -219,7 +219,7 @@ public class InternalTopicIntegrationTest { assertTrue(policies.contains(LogConfig.Compact())); assertTrue(policies.contains(LogConfig.Delete())); // retention should be 1 day + the window duration - final Long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; - assertEquals(retention, Long.valueOf(properties.getProperty(LogConfig.RetentionMsProp()))); + final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; + assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp()))); } }