ivanyu commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1280742713
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -621,10 +626,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); - remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); + Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + + int customMetadataSizeLimit = RemoteLogManager.this.rlmConfig.remoteLogMetadataCustomMetadataMaxSize(); + if (customMetadata.isPresent()) { + long customMetadataSize = customMetadata.get().value().length; + if (customMetadataSize > customMetadataSizeLimit) { + CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); + logger.error("Custom metadata size {} exceeds configured limit {}." + + " Copying will be stopped and copied segment will be attempted to clean." + + " Original metadata: {}", + customMetadataSize, customMetadataSizeLimit, copySegmentStartedRlsm, e); + try { + // For deletion, we provide back the custom metadata by creating a new metadata object from the update. + // However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); Review Comment: Do you mean switching the segment state to `DELETE_SEGMENT_STARTED` and ultimately to `DELETE_SEGMENT_FINISHED`? The idea was to try to roll back to the state that was at L629, i.e. before calling `remoteLogStorageManager.copyLogSegmentData` (by undoing the copying) and let the operator decide what's next. I analyzed the metadata flow and it seems both approaches--deleting only the data and leaving the segment in the `COPY_SEGMENT_STARTED` (the current one); and fully deleting the metadata (the one you propose)--will have the same final result. However, the current one is a bit simpler and has fewer steps to fail. So I think we should keep it unless I'm missing some factor. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org