ijuma commented on a change in pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#discussion_r511620727



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1099,171 +1099,169 @@ class Log(@volatile private var _dir: File,
       val appendInfo = analyzeAndValidateRecords(records, origin, 
ignoreRecordSize)
 
       // return if we have no valid messages or if this is a duplicate of the 
last appended entry
-      if (appendInfo.shallowCount == 0)
-        return appendInfo
+      if (appendInfo.shallowCount == 0) appendInfo
+      else {
 
-      // trim any invalid bytes or partial messages before appending it to the 
on-disk log
-      var validRecords = trimInvalidBytes(records, appendInfo)
+        // trim any invalid bytes or partial messages before appending it to 
the on-disk log
+        var validRecords = trimInvalidBytes(records, appendInfo)
 
-      // they are valid, insert them in the log
-      lock synchronized {
-        checkIfMemoryMappedBufferClosed()
-        if (assignOffsets) {
-          // assign offsets to the message set
-          val offset = new LongRef(nextOffsetMetadata.messageOffset)
-          appendInfo.firstOffset = Some(offset.value)
-          val now = time.milliseconds
-          val validateAndOffsetAssignResult = try {
-            LogValidator.validateMessagesAndAssignOffsets(validRecords,
-              topicPartition,
-              offset,
-              time,
-              now,
-              appendInfo.sourceCodec,
-              appendInfo.targetCodec,
-              config.compact,
-              config.messageFormatVersion.recordVersion.value,
-              config.messageTimestampType,
-              config.messageTimestampDifferenceMaxMs,
-              leaderEpoch,
-              origin,
-              interBrokerProtocolVersion,
-              brokerTopicStats)
-          } catch {
-            case e: IOException =>
-              throw new KafkaException(s"Error validating messages while 
appending to log $name", e)
-          }
-          validRecords = validateAndOffsetAssignResult.validatedRecords
-          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
-          appendInfo.offsetOfMaxTimestamp = 
validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
-          appendInfo.lastOffset = offset.value - 1
-          appendInfo.recordConversionStats = 
validateAndOffsetAssignResult.recordConversionStats
-          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
-            appendInfo.logAppendTime = now
-
-          // re-validate message sizes if there's a possibility that they have 
changed (due to re-compression or message
-          // format conversion)
-          if (!ignoreRecordSize && 
validateAndOffsetAssignResult.messageSizeMaybeChanged) {
-            for (batch <- validRecords.batches.asScala) {
-              if (batch.sizeInBytes > config.maxMessageSize) {
-                // we record the original message set size instead of the 
trimmed size
-                // to be consistent with pre-compression bytesRejectedRate 
recording
-                
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-                
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-                throw new RecordTooLargeException(s"Message batch size is 
${batch.sizeInBytes} bytes in append to" +
-                  s"partition $topicPartition which exceeds the maximum 
configured size of ${config.maxMessageSize}.")
-              }
+        // they are valid, insert them in the log
+        lock synchronized {
+          checkIfMemoryMappedBufferClosed()
+          if (assignOffsets) {
+            // assign offsets to the message set
+            val offset = new LongRef(nextOffsetMetadata.messageOffset)
+            appendInfo.firstOffset = Some(offset.value)
+            val now = time.milliseconds
+            val validateAndOffsetAssignResult = try {
+              LogValidator.validateMessagesAndAssignOffsets(validRecords,
+                topicPartition,
+                offset,
+                time,
+                now,
+                appendInfo.sourceCodec,
+                appendInfo.targetCodec,
+                config.compact,
+                config.messageFormatVersion.recordVersion.value,
+                config.messageTimestampType,
+                config.messageTimestampDifferenceMaxMs,
+                leaderEpoch,
+                origin,
+                interBrokerProtocolVersion,
+                brokerTopicStats)
+            } catch {
+              case e: IOException =>
+                throw new KafkaException(s"Error validating messages while 
appending to log $name", e)
             }
-          }
-        } else {
-          // we are taking the offsets we are given
-          if (!appendInfo.offsetsMonotonic)
-            throw new OffsetsOutOfOrderException(s"Out of order offsets found 
in append to $topicPartition: " +
-                                                 
records.records.asScala.map(_.offset))
-
-          if (appendInfo.firstOrLastOffsetOfFirstBatch < 
nextOffsetMetadata.messageOffset) {
-            // we may still be able to recover if the log is empty
-            // one example: fetching from log start offset on the leader which 
is not batch aligned,
-            // which may happen as a result of AdminClient#deleteRecords()
-            val firstOffset = appendInfo.firstOffset match {
-              case Some(offset) => offset
-              case None => records.batches.asScala.head.baseOffset()
+            validRecords = validateAndOffsetAssignResult.validatedRecords
+            appendInfo.maxTimestamp = 
validateAndOffsetAssignResult.maxTimestamp
+            appendInfo.offsetOfMaxTimestamp = 
validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
+            appendInfo.lastOffset = offset.value - 1
+            appendInfo.recordConversionStats = 
validateAndOffsetAssignResult.recordConversionStats
+            if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
+              appendInfo.logAppendTime = now
+
+            // re-validate message sizes if there's a possibility that they 
have changed (due to re-compression or message
+            // format conversion)
+            if (!ignoreRecordSize && 
validateAndOffsetAssignResult.messageSizeMaybeChanged) {
+              for (batch <- validRecords.batches.asScala) {
+                if (batch.sizeInBytes > config.maxMessageSize) {
+                  // we record the original message set size instead of the 
trimmed size
+                  // to be consistent with pre-compression bytesRejectedRate 
recording
+                  
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
+                  
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
+                  throw new RecordTooLargeException(s"Message batch size is 
${batch.sizeInBytes} bytes in append to" +
+                    s"partition $topicPartition which exceeds the maximum 
configured size of ${config.maxMessageSize}.")
+                }
+              }
             }
+          } else {
+            // we are taking the offsets we are given
+            if (!appendInfo.offsetsMonotonic)
+              throw new OffsetsOutOfOrderException(s"Out of order offsets 
found in append to $topicPartition: " +
+                records.records.asScala.map(_.offset))
+
+            if (appendInfo.firstOrLastOffsetOfFirstBatch < 
nextOffsetMetadata.messageOffset) {
+              // we may still be able to recover if the log is empty
+              // one example: fetching from log start offset on the leader 
which is not batch aligned,
+              // which may happen as a result of AdminClient#deleteRecords()
+              val firstOffset = appendInfo.firstOffset match {
+                case Some(offset) => offset
+                case None => records.batches.asScala.head.baseOffset()
+              }
 
-            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First 
offset" else "Last offset of the first batch"
-            throw new UnexpectedAppendOffsetException(
-              s"Unexpected offset in append to $topicPartition. $firstOrLast " 
+
-              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the 
next offset ${nextOffsetMetadata.messageOffset}. " +
-              s"First 10 offsets in append: 
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
-              s" append: ${appendInfo.lastOffset}. Log start offset = 
$logStartOffset",
-              firstOffset, appendInfo.lastOffset)
+              val firstOrLast = if (appendInfo.firstOffset.isDefined) "First 
offset" else "Last offset of the first batch"
+              throw new UnexpectedAppendOffsetException(
+                s"Unexpected offset in append to $topicPartition. $firstOrLast 
" +
+                  s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than 
the next offset ${nextOffsetMetadata.messageOffset}. " +
+                  s"First 10 offsets in append: 
${records.records.asScala.take(10).map(_.offset)}, last offset in" +
+                  s" append: ${appendInfo.lastOffset}. Log start offset = 
$logStartOffset",
+                firstOffset, appendInfo.lastOffset)
+            }
           }
-        }
 
-        // update the epoch cache with the epoch stamped onto the message by 
the leader
-        validRecords.batches.forEach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, 
batch.baseOffset)
-          } else {
-            // In partial upgrade scenarios, we may get a temporary regression 
to the message format. In
-            // order to ensure the safety of leader election, we clear the 
epoch cache so that we revert
-            // to truncation by high watermark after the next leader election.
-            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
-              warn(s"Clearing leader epoch cache after unexpected append with 
message format v${batch.magic}")
-              cache.clearAndFlush()
+          // update the epoch cache with the epoch stamped onto the message by 
the leader
+          validRecords.batches.forEach { batch =>
+            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+              maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, 
batch.baseOffset)
+            } else {
+              // In partial upgrade scenarios, we may get a temporary 
regression to the message format. In
+              // order to ensure the safety of leader election, we clear the 
epoch cache so that we revert
+              // to truncation by high watermark after the next leader 
election.
+              leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+                warn(s"Clearing leader epoch cache after unexpected append 
with message format v${batch.magic}")
+                cache.clearAndFlush()
+              }
             }
           }
-        }
 
-        // check messages set size may be exceed config.segmentSize
-        if (validRecords.sizeInBytes > config.segmentSize) {
-          throw new RecordBatchTooLargeException(s"Message batch size is 
${validRecords.sizeInBytes} bytes in append " +
-            s"to partition $topicPartition, which exceeds the maximum 
configured segment size of ${config.segmentSize}.")
-        }
+          // check messages set size may be exceed config.segmentSize
+          if (validRecords.sizeInBytes > config.segmentSize) {
+            throw new RecordBatchTooLargeException(s"Message batch size is 
${validRecords.sizeInBytes} bytes in append " +
+              s"to partition $topicPartition, which exceeds the maximum 
configured segment size of ${config.segmentSize}.")
+          }
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
-
-        val logOffsetMetadata = LogOffsetMetadata(
-          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
-          segmentBaseOffset = segment.baseOffset,
-          relativePositionInSegment = segment.size)
-
-        // now that we have valid records, offsets assigned, and timestamps 
updated, we need to
-        // validate the idempotent/transactional state of the producers and 
collect some metadata
-        val (updatedProducers, completedTxns, maybeDuplicate) = 
analyzeAndValidateProducerState(
-          logOffsetMetadata, validRecords, origin)
-
-        maybeDuplicate.foreach { duplicate =>
-          appendInfo.firstOffset = Some(duplicate.firstOffset)
-          appendInfo.lastOffset = duplicate.lastOffset
-          appendInfo.logAppendTime = duplicate.timestamp
-          appendInfo.logStartOffset = logStartOffset
-          return appendInfo
-        }
+          // maybe roll the log if this segment is full
+          val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
 
-        segment.append(largestOffset = appendInfo.lastOffset,
-          largestTimestamp = appendInfo.maxTimestamp,
-          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
-          records = validRecords)
-
-        // Increment the log end offset. We do this immediately after the 
append because a
-        // write to the transaction index below may fail and we want to ensure 
that the offsets
-        // of future appends still grow monotonically. The resulting 
transaction index inconsistency
-        // will be cleaned up after the log directory is recovered. Note that 
the end offset of the
-        // ProducerStateManager will not be updated and the last stable offset 
will not advance
-        // if the append to the transaction index fails.
-        updateLogEndOffset(appendInfo.lastOffset + 1)
-
-        // update the producer state
-        for (producerAppendInfo <- updatedProducers.values) {
-          producerStateManager.update(producerAppendInfo)
-        }
+          val logOffsetMetadata = LogOffsetMetadata(
+            messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
+            segmentBaseOffset = segment.baseOffset,
+            relativePositionInSegment = segment.size)
 
-        // update the transaction index with the true last stable offset. The 
last offset visible
-        // to consumers using READ_COMMITTED will be limited by this value and 
the high watermark.
-        for (completedTxn <- completedTxns) {
-          val lastStableOffset = 
producerStateManager.lastStableOffset(completedTxn)
-          segment.updateTxnIndex(completedTxn, lastStableOffset)
-          producerStateManager.completeTxn(completedTxn)
-        }
+          // now that we have valid records, offsets assigned, and timestamps 
updated, we need to
+          // validate the idempotent/transactional state of the producers and 
collect some metadata
+          val (updatedProducers, completedTxns, maybeDuplicate) = 
analyzeAndValidateProducerState(
+            logOffsetMetadata, validRecords, origin)
 
-        // always update the last producer id map offset so that the snapshot 
reflects the current offset
-        // even if there isn't any idempotent data being written
-        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
+          if (maybeDuplicate.isDefined) {

Review comment:
       Seems like we should use a pattern match here instead of `isDefined` and 
`get`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to